Commit Graph

685 Commits

Author SHA1 Message Date
Michael Klishin eb261acd30
CLI: update guide URLs to use the new path structure
the original paths, e.g. /streams.html, do have redirects
in place but it turned out to be a surprisingly fragile
Cloudflare feature when there are hundreds of them,
so we better switch now.
2024-03-07 15:53:14 -05:00
David Ansari b0142287c7 Protect receiving app from being overloaded
What?

Protect receiving application from being overloaded with new messages
while still processing existing messages if the auto credit renewal
feature of the Erlang AMQP 1.0 client library is used.

This feature can therefore be thought of as a prefetch window equivalent
in AMQP 0.9.1 or MQTT 5.0 property Receive Maximum.

How?

The credit auto renewal feature in RabbitMQ 3.x was wrongly implemented.
This commit takes the same approach as done in the server:
The incoming_unsettled map is hold in the link instead of in the session
to accurately and quickly determine the number of unsettled messages for
a receiving link.

The amqp10_client lib will grant more credits to the sender when the sum
of remaining link credits and number of unsettled deliveries falls below
the threshold RenewWhenBelow.

This avoids maintaning additional state like the `link_credit_unsettled`
or an alternative delivery_count_settled sequence number which is more
complex to implement correctly.

This commit breaks the amqp10_client_session:disposition/6 API:
This commit forces the client application to only range settle for a
given link, i.e. not across multiple links on a given session at once.
The latter is allowed according to the AMQP spec.
2024-02-28 14:15:20 +01:00
David Ansari 8cb313d5a1 Support AMQP 1.0 natively
## What

Similar to Native MQTT in #5895, this commits implements Native AMQP 1.0.
By "native", we mean do not proxy via AMQP 0.9.1 anymore.

  ## Why

Native AMQP 1.0 comes with the following major benefits:
1. Similar to Native MQTT, this commit provides better throughput, latency,
   scalability, and resource usage for AMQP 1.0.
   See https://blog.rabbitmq.com/posts/2023/03/native-mqtt for native MQTT improvements.
   See further below for some benchmarks.
2. Since AMQP 1.0 is not limited anymore by the AMQP 0.9.1 protocol,
   this commit allows implementing more AMQP 1.0 features in the future.
   Some features are already implemented in this commit (see next section).
3. Simpler, better understandable, and more maintainable code.

Native AMQP 1.0 as implemented in this commit has the
following major benefits compared to AMQP 0.9.1:
4. Memory and disk alarms will only stop accepting incoming TRANSFER frames.
   New connections can still be created to consume from RabbitMQ to empty queues.
5. Due to 4. no need anymore for separate connections for publishers and
   consumers as we currently recommended for AMQP 0.9.1. which potentially
   halves the number of physical TCP connections.
6. When a single connection sends to multiple target queues, a single
   slow target queue won't block the entire connection.
   Publisher can still send data quickly to all other target queues.
7. A publisher can request whether it wants publisher confirmation on a per-message basis.
   In AMQP 0.9.1 publisher confirms are configured per channel only.
8. Consumers can change their "prefetch count" dynamically which isn't
   possible in our AMQP 0.9.1 implementation. See #10174
9. AMQP 1.0 is an extensible protocol

This commit also fixes dozens of bugs present in the AMQP 1.0 plugin in
RabbitMQ 3.x - most of which cannot be backported due to the complexity
and limitations of the old 3.x implementation.

This commit contains breaking changes and is therefore targeted for RabbitMQ 4.0.

 ## Implementation details

1. Breaking change: With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
will break because we always convert according to the message container conversions.
For example, AMQP 0.9.1 x-headers will go into message-annotations instead of application properties.
Also, `false` won’t be respected since we always convert the headers with message containers.

2. Remove rabbit_queue_collector

rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.

3. 7 processes per connection + 1 process per session in this commit instead of
   7 processes per connection + 15 processes per session in 3.x
Supervision hierarchy got re-designed.

4. Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel.
Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer process per AMQP 1.0 session.
Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
  a connection write heavily at the same time.

This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.
Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
  can accumulate across all sessions bytes before flushing the socket.

In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy.
We still ensure high throughput by having separate reader, writer, and
session processes.

5. Transform rabbit_amqp1_0_writer into gen_server
Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's better
to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.

How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.

6. Remove stats timer from writer
AMQP 1.0 connections haven't emitted any stats previously.

7. When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs.
This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
  settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
  and unsettled TRANSFERs.

8. Introduce credit API v2
Why:
The AMQP 0.9.1 credit extension which is to be removed in 4.0 was poorly
designed since basic.credit is a synchronous call into the queue process
blocking the entire AMQP 1.0 session process.

How:
Change the interactions between queue clients and queue server
implementations:
* Clients only request a credit reply if the FLOW's `echo` field is set
* Include all link flow control state held by the queue process into a
  new credit_reply queue event:
  * `available` after the queue sends any deliveries
  * `link-credit` after the queue sends any deliveries
  * `drain` which allows us to combine the old queue events
    send_credit_reply and send_drained into a single new queue event
    credit_reply.
* Include the consumer tag into the credit_reply queue event such that
  the AMQP 1.0 session process can process any credit replies
  asynchronously.

Link flow control state `delivery-count` also moves to the queue processes.

The new interactions are hidden behind feature flag credit_api_v2 to
allow for rolling upgrades from 3.13 to 4.0.

9. Use serial number arithmetic in quorum queues and session process.

10. Completely bypass the rabbit_limiter module for AMQP 1.0
flow control. The goal is to eventually remove the rabbit_limiter module
in 4.0 since AMQP 0.9.1 global QoS will be unsupported in 4.0. This
commit lifts the AMQP 1.0 link flow control logic out of rabbit_limiter
into rabbit_queue_consumers.

11. Fix credit bug for streams:
AMQP 1.0 settlements shouldn't top up link credit,
only FLOW frames should top up link credit.

12. Allow sender settle mode unsettled for streams
since AMQP 1.0 acknowledgements to streams are no-ops (currently).

13. Fix AMQP 1.0 client bugs
Auto renewing credits should not be related to settling TRANSFERs.
Remove field link_credit_unsettled as it was wrong and confusing.
Prior to this commit auto renewal did not work when the sender uses
sender settlement mode settled.

14. Fix AMQP 1.0 client bugs
The wrong outdated Link was passed to function auto_flow/2

15. Use osiris chunk iterator
Only hold messages of uncompressed sub batches in memory if consumer
doesn't have sufficient credits.
Compressed sub batches are skipped for non Stream protocol consumers.

16. Fix incoming link flow control
Always use confirms between AMQP 1.0 queue clients and queue servers.
As already done internally by rabbit_fifo_client and
rabbit_stream_queue, use confirms for classic queues as well.

17. Include link handle into correlation when publishing messages to target queues
such that session process can correlate confirms from target queues to
incoming links.

18. Only grant more credits to publishers if publisher hasn't sufficient credits
anymore and there are not too many unconfirmed messages on the link.

19. Completely ignore `block` and `unblock` queue actions and RabbitMQ credit flow
between classic queue process and session process.

20. Link flow control is independent between links.
A client can refer to a queue or to an exchange with multiple
dynamically added target queues. Multiple incoming links can also fan
in to the same queue. However the link topology looks like, this
commit ensures that each link is only granted more credits if that link
isn't overloaded.

21. A connection or a session can send to many different queues.
In AMQP 0.9.1, a single slow queue will lead to the entire channel, and
then entire connection being blocked.
This commit makes sure that a single slow queue from one link won't slow
down sending on other links.
For example, having link A sending to a local classic queue and
link B sending to 5 replica quorum queue, link B will naturally
grant credits slower than link A. So, despite the quorum queue being
slower in confirming messages, the same AMQP 1.0 connection and session
can still pump data very fast into the classic queue.

22. If cluster wide memory or disk alarm occurs.
Each session sends a FLOW with incoming-window to 0 to sending client.
If sending clients don’t obey, force disconnect the client.

If cluster wide memory alarm clears:
Each session resumes with a FLOW defaulting to initial incoming-window.

23. All operations apart of publishing TRANSFERS to RabbitMQ can continue during cluster wide alarms,
specifically, attaching consumers and consuming, i.e. emptying queues.
There is no need for separate AMQP 1.0 connections for publishers and consumers as recommended in our AMQP 0.9.1 implementation.

24. Flow control summary:
* If queue becomes bottleneck, that’s solved by slowing down individual sending links (AMQP 1.0 link flow control).
* If session becomes bottleneck (more unlikely), that’s solved by AMQP 1.0 session flow control.
* If connection becomes bottleneck, it naturally won’t read fast enough from the socket causing TCP backpressure being applied.
Nowhere will RabbitMQ internal credit based flow control (i.e. module credit_flow) be used on the incoming AMQP 1.0 message path.

25. Register AMQP sessions
Prefer local-only pg over our custom pg_local implementation as
pg is a better process group implementation than pg_local.
pg_local was identified as bottleneck in tests where many MQTT clients were disconnected at once.

26. Start a local-only pg when Rabbit boots:
> A scope can be kept local-only by using a scope name that is unique cluster-wide, e.g. the node name:
> pg:start_link(node()).
Register AMQP 1.0 connections and sessions with pg.

In future we should remove pg_local and instead use the new local-only
pg for all registered processes such as AMQP 0.9.1 connections and channels.

27. Requeue messages if link detached
Although the spec allows to settle delivery IDs on detached links, RabbitMQ does not respect the 'closed'
field of the DETACH frame and therefore handles every DETACH frame as closed. Since the link is closed,
we expect every outstanding delivery to be requeued.
In addition to consumer cancellation, detaching a link therefore causes in flight deliveries to be requeued.
Note that this behaviour is different from merely consumer cancellation in AMQP 0.9.1:
"After a consumer is cancelled there will be no future deliveries dispatched to it. Note that there can
still be "in flight" deliveries dispatched previously. Cancelling a consumer will neither discard nor requeue them."
[https://www.rabbitmq.com/consumers.html#unsubscribing]
An AMQP receiver can first drain, and then detach to prevent "in flight" deliveries

28. Init AMQP session with BEGIN frame
Similar to how there can't be an MQTT processor without a CONNECT
frame, there can't be an AMQP session without a BEGIN frame.
This allows having strict dialyzer types for session flow control
fields (i.e. not allowing 'undefined').

29. Move serial_number to AMQP 1.0 common lib
such that it can be used by both AMQP 1.0 server and client

30. Fix AMQP client to do serial number arithmetic.

31. AMQP client: Differentiate between delivery-id and transfer-id for better
understandability.

32. Fix link flow control in classic queues
This commit fixes
```
java -jar target/perf-test.jar -ad false -f persistent -u cq -c 3000 -C 1000000 -y 0
```
followed by
```
./omq -x 0 amqp -T /queue/cq -D 1000000 --amqp-consumer-credits 2
```
Prior to this commit, (and on RabbitMQ 3.x) the consuming would halt after around
8 - 10,000 messages.

The bug was that in flight messages from classic queue process to
session process were not taken into account when topping up credit to
the classic queue process.
Fixes #2597

The solution to this bug (and a much cleaner design anyway independent of
this bug) is that queues should hold all link flow control state including
the delivery-count.

Hence, when credit API v2 is used the delivery-count will be held by the
classic queue process, quorum queue process, and stream queue client
instead of managing the delivery-count in the session.

33. The double level crediting between (a) session process and
rabbit_fifo_client, and (b) rabbit_fifo_client and rabbit_fifo was
removed. Therefore, instead of managing 3 separate delivery-counts (i. session,
ii. rabbit_fifo_client, iii. rabbit_fifo), only 1 delivery-count is used
in rabbit_fifo. This is a big simplification.

34. This commit fixes quorum queues without bumping the machine version
nor introducing new rabbit_fifo commands.

Whether credit API v2 is used is solely determined at link attachment time
depending on whether feature flag credit_api_v2 is enabled.

Even when that feature flag will be enabled later on, this link will
keep using credit API v1 until detached (or the node is shut down).

Eventually, after feature flag credit_api_v2 has been enabled and a
subsequent rolling upgrade, all links will use credit API v2.

This approach is safe and simple.

The 2 alternatives to move delivery-count from the session process to the
queue processes would have been:

i. Explicit feature flag credit_api_v2 migration function
* Can use a gen_server:call and only finish migration once all delivery-counts were migrated.
Cons:
* Extra new message format just for migration is required.
* Risky as migration will fail if a target queue doesn’t reply.

ii. Session always includes DeliveryCountSnd when crediting to the queue:
Cons:
* 2 delivery counts will be hold simultaneously in session proc and queue proc;
could be solved by deleting the session proc’s delivery-count for credit-reply
* What happens if the receiver doesn’t provide credit for a very long time? Is that a problem?

35. Support stream filtering in AMQP 1.0 (by @acogoluegnes)
Use the x-stream-filter-value message annotation
to carry the filter value in a published message.
Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered
filters when creating a receiver that wants to filter
out messages from a stream.

36. Remove credit extension from AMQP 0.9.1 client

37. Support maintenance mode closing AMQP 1.0 connections.

38. Remove AMQP 0.9.1 client dependency from AMQP 1.0 implementation.

39. Move AMQP 1.0 plugin to the core. AMQP 1.0 is enabled by default.
    The old rabbitmq_amqp1_0 plugin will be kept as a no-op plugin to prevent deployment
    tools from failing that execute:
```
rabbitmq-plugins enable rabbitmq_amqp1_0
rabbitmq-plugins disable rabbitmq_amqp1_0
```

40. Breaking change: Remove CLI command `rabbitmqctl list_amqp10_connections`.
Instead, list both AMQP 0.9.1 and AMQP 1.0 connections in `list_connections`:
```
rabbitmqctl list_connections protocol
Listing connections ...
protocol
{1, 0}
{0,9,1}
```

 ## Benchmarks

 ### Throughput & Latency

Setup:
* Single node Ubuntu 22.04
* Erlang 26.1.1

Start RabbitMQ:
```
make run-broker PLUGINS="rabbitmq_management rabbitmq_amqp1_0" FULL=1 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 3"
```

Predeclare durable classic queue cq1, durable quorum queue qq1, durable stream queue sq1.

Start client:
https://github.com/ssorj/quiver
https://hub.docker.com/r/ssorj/quiver/tags (digest 453a2aceda64)
```
docker run -it --rm --add-host host.docker.internal:host-gateway ssorj/quiver:latest
bash-5.1# quiver --version
quiver 0.4.0-SNAPSHOT
```

1. Classic queue
```
quiver //host.docker.internal//amq/queue/cq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 73.8 seconds
Sender rate .......................................... 13,548 messages/s
Receiver rate ........................................ 13,547 messages/s
End-to-end rate ...................................... 13,547 messages/s

Latencies by percentile:

          0% ........ 0 ms       90.00% ........ 9 ms
         25% ........ 2 ms       99.00% ....... 14 ms
         50% ........ 4 ms       99.90% ....... 17 ms
        100% ....... 26 ms       99.99% ....... 24 ms
```

RabbitMQ 3.x (main branch as of 30 January 2024):
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        6     73.6       2.1          3,217       1,607        0      8.0       511
     4.1        163,580      16,367        2     74.1       4.1          3,217           0        0      8.0         0
     6.1        229,114      32,767        3     74.1       6.1          3,217           0        0      8.0         0
     8.1        261,880      16,367        2     74.1       8.1         67,874      32,296        8      8.2     7,662
    10.1        294,646      16,367        2     74.1      10.1         67,874           0        0      8.2         0
    12.1        360,180      32,734        3     74.1      12.1         67,874           0        0      8.2         0
    14.1        392,946      16,367        3     74.1      14.1         68,604         365        0      8.2    12,147
    16.1        458,480      32,734        3     74.1      16.1         68,604           0        0      8.2         0
    18.1        491,246      16,367        2     74.1      18.1         68,604           0        0      8.2         0
    20.1        556,780      32,767        4     74.1      20.1         68,604           0        0      8.2         0
    22.1        589,546      16,375        2     74.1      22.1         68,604           0        0      8.2         0
receiver timed out
    24.1        622,312      16,367        2     74.1      24.1         68,604           0        0      8.2         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/cq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-otujr23y' returned non-zero exit status 1.
```

2. Quorum queue:
```
quiver //host.docker.internal//amq/queue/qq1 --durable --count 1m --duration 10m --body-size 12 --credit 1000
```
This commit:
```
Count ............................................. 1,000,000 messages
Duration .............................................. 101.4 seconds
Sender rate ........................................... 9,867 messages/s
Receiver rate ......................................... 9,868 messages/s
End-to-end rate ....................................... 9,865 messages/s

Latencies by percentile:

          0% ....... 11 ms       90.00% ....... 23 ms
         25% ....... 15 ms       99.00% ....... 28 ms
         50% ....... 18 ms       99.90% ....... 33 ms
        100% ....... 49 ms       99.99% ....... 47 ms
```

RabbitMQ 3.x:
```
---------------------- Sender -----------------------  --------------------- Receiver ----------------------  --------
Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Time [s]      Count [m]  Rate [m/s]  CPU [%]  RSS [M]  Lat [ms]
-----------------------------------------------------  -----------------------------------------------------  --------
     2.1        130,814      65,342        9     69.9       2.1         18,430       9,206        5      7.6     1,221
     4.1        163,580      16,375        5     70.2       4.1         18,867         218        0      7.6     2,168
     6.1        229,114      32,767        6     70.2       6.1         18,867           0        0      7.6         0
     8.1        294,648      32,734        7     70.2       8.1         18,867           0        0      7.6         0
    10.1        360,182      32,734        6     70.2      10.1         18,867           0        0      7.6         0
    12.1        425,716      32,767        6     70.2      12.1         18,867           0        0      7.6         0
receiver timed out
    14.1        458,482      16,367        5     70.2      14.1         18,867           0        0      7.6         0
quiver:  error: PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
Traceback (most recent call last):
  File "/usr/local/lib/quiver/python/quiver/pair.py", line 144, in run
    _plano.wait(receiver, check=True)
  File "/usr/local/lib/quiver/python/plano/main.py", line 1243, in wait
    raise PlanoProcessError(proc)
plano.main.PlanoProcessError: Command 'quiver-arrow receive //host.docker.internal//amq/queue/qq1 --impl qpid-proton-c --duration 10m --count 1m --rate 0 --body-size 12 --credit 1000 --transaction-size 0 --timeout 10 --durable --output /tmp/quiver-b1gcup43' returned non-zero exit status 1.
```

3. Stream:
```
quiver-arrow send //host.docker.internal//amq/queue/sq1 --durable --count 1m -d 10m --summary --verbose
```

This commit:
```
Count ............................................. 1,000,000 messages
Duration ................................................ 8.7 seconds
Message rate ........................................ 115,154 messages/s
```

RabbitMQ 3.x:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 21.2 seconds
Message rate ......................................... 47,232 messages/s
```

 ### Memory usage

Start RabbitMQ:
```
ERL_MAX_PORTS=3000000 RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+P 3000000 +S 6" make run-broker PLUGINS="rabbitmq_amqp1_0" FULL=1 RABBITMQ_CONFIG_FILE="rabbitmq.conf"
```

```
/bin/cat rabbitmq.conf

tcp_listen_options.sndbuf  = 2048
tcp_listen_options.recbuf  = 2048
vm_memory_high_watermark.relative = 0.95
vm_memory_high_watermark_paging_ratio = 0.95
loopback_users = none
```

Create 50k connections with 2 sessions per connection, i.e. 100k session in total:

```go
package main

import (
	"context"
	"log"
	"time"

	"github.com/Azure/go-amqp"
)

func main() {
	for i := 0; i < 50000; i++ {
		conn, err := amqp.Dial(context.TODO(), "amqp://nuc", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()})
		if err != nil {
			log.Fatal("dialing AMQP server:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
		_, err = conn.NewSession(context.TODO(), nil)
		if err != nil {
			log.Fatal("creating AMQP session:", err)
		}
	}
	log.Println("opened all connections")
	time.Sleep(5 * time.Hour)
}
```

This commit:
```
erlang:memory().
[{total,4586376480},
 {processes,4025898504},
 {processes_used,4025871040},
 {system,560477976},
 {atom,1048841},
 {atom_used,1042841},
 {binary,233228608},
 {code,21449982},
 {ets,108560464}]

erlang:system_info(process_count).
450289
```
7 procs per connection + 1 proc per session.
(7 + 2*1) * 50,000 = 450,000 procs

RabbitMQ 3.x:
```
erlang:memory().
[{total,15168232704},
 {processes,14044779256},
 {processes_used,14044755120},
 {system,1123453448},
 {atom,1057033},
 {atom_used,1052587},
 {binary,236381264},
 {code,21790238},
 {ets,391423744}]

erlang:system_info(process_count).
1850309
```
7 procs per connection + 15 per session
(7 + 2*15) * 50,000 = 1,850,000 procs

50k connections + 100k session require
with this commit: 4.5 GB
in RabbitMQ 3.x: 15 GB

 ## Future work

1. More efficient parser and serializer
2. TODO in mc_amqp: Do not store the parsed message on disk.
3. Implement both AMQP HTTP extension and AMQP management extension to allow AMQP
clients to create RabbitMQ objects (queues, exchanges, ...).
2024-02-28 14:15:20 +01:00
Rin Kuryloski d5624ab5dc Add gazelle directive to stabilize bazel run gazelle
rabbit_common is indirectly included via rabbit_stream_reader.hrl, and
the rules_erlang gazelle extension does not yet know how to detect
this, therefore the directive manually declares it
2024-02-19 12:53:58 +01:00
Arnaud Cogoluègnes 6e5578a132
Add comment for offset lag calculation 2024-02-13 08:30:59 +01:00
Arnaud Cogoluègnes 5b5c8a4132
Fix consumer offset and offset lag calculation
The offset lag for a consumer is the difference between the
last committed offset (offset of the first message in the
last chunk confirmed by a quorum of stream members) and
the current offset of the consumer (offset of the first message
in the last chunk dispatched to the consumer).

The calculation is simple for most cases, but it needs
to be refined by using more context for edge cases (subscription
to a stream that has not messages yet, subscription at the
very end of a quiet stream).

Example: subscription at "next" (waiting for new messages) in
a quiet stream (no messages published). The previous implementation
would return consumer offset = 0 and offset lag = last committed
offset, where we would expect to get consumer offset = next offset
and offset lag = 0.

This commit fixes the calculation for these edge cases.
2024-02-12 17:41:41 +01:00
Arnaud Cogoluègnes 5e154c206d
Log message in case of stream publisher declaration failure
The "precondition failed" error code is good enough for
an application, but it may be useful when working on a client
library to have more information about the reason of
the failure of the publisher declaration.

This commit adds a warning log message that gives the reason(s)
of the failure (publisher ID already taken and/or existing
publisher with the same reference for the same stream in the
connection).
2024-02-09 16:30:12 +01:00
dependabot[bot] d379aa87b9
build(deps-dev): bump junit.jupiter.version
Bumps `junit.jupiter.version` from 5.10.1 to 5.10.2.

Updates `org.junit.jupiter:junit-jupiter-engine` from 5.10.1 to 5.10.2
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.10.1...r5.10.2)

Updates `org.junit.jupiter:junit-jupiter-params` from 5.10.1 to 5.10.2
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.10.1...r5.10.2)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter-engine
  dependency-type: direct:development
  update-type: version-update:semver-patch
- dependency-name: org.junit.jupiter:junit-jupiter-params
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-02-05 18:08:21 +00:00
dependabot[bot] 9593b77e82
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.2 to 3.25.3.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.2...assertj-build-3.25.3)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-02-05 18:06:09 +00:00
Michael Klishin 9c79ad8d55 More missed license header updates #9969 2024-02-05 12:26:25 -05:00
Michael Klishin f414c2d512
More missed license header updates #9969 2024-02-05 11:53:50 -05:00
Michael Klishin a9ced75476 rabbitmq_stream: add a few missing license headers 2024-01-27 20:46:16 -05:00
Karl Nilsson b1d7037c7d
Merge pull request #10331 from rabbitmq/stream-coordinator-fixes
Stream coordinator: fixes to automatic membership changes.
2024-01-26 15:39:27 +00:00
dependabot[bot] f617244806
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.1 to 3.25.2.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.1...assertj-build-3.25.2)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-24 18:37:56 +00:00
Karl Nilsson 45e5cb1dba rabbit_streams_SUITE test fix 2024-01-24 15:12:44 +00:00
dependabot[bot] 61ac44d9ab
build(deps): bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.42.0 to 2.43.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.42.0...lib/2.43.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-23 18:50:10 +00:00
Arnaud Cogoluègnes 213281e812
Fix dependency in stream unit test 2024-01-23 10:51:44 +01:00
Michael Klishin 7b151a7651 More missed (c) header updates 2024-01-22 23:44:47 -05:00
Arnaud Cogoluègnes ed3aecefd0
Use read permission for store_offset
Instead of write permission. Storing an offset in a stream
is not like adding messages to the stream, so we consider
the write permission to be too hard for this case.
Read permission, like for consuming, is more appropriate.

The implementation checks there is a subscription for
the target stream and falls back to an actual read
permission call if there is not (e.g. when the subscription
state goes away before store_offset is sent when closing a consumer).

Fixes #10383
2024-01-22 17:25:40 +01:00
Arnaud Cogoluègnes 3bd0fdc316
Merge pull request #10299 from rabbitmq/token-expiration-in-stream-connections
Take expiry timestamp into account in stream connections
2024-01-22 10:23:20 +01:00
Arnaud Cogoluègnes fb4f50dd8e
Catch ETS exception when system is still starting
Because the ETS table may be not created yet.
A stream connection can call another node to
get the port of the stream plugin on this node.
The stream plugin uses the rabbit_networking module,
which relies on an ETS table to store listener information.
The table may be not created yet when the node starts up,
so the calling node ends up logging a large stack trace.

This commit catches the exception and logs a simple message.
This reduces unnecessary noise in the logs.
2024-01-19 16:24:24 +01:00
Arnaud Cogoluègnes 6fc3187ccb
Do not check stream write access on every publish
Stream write access is already checked when creating the publisher.

Fixes #10371
2024-01-19 14:46:57 +01:00
Arnaud Cogoluègnes bc2a11d1bd
Re-evaluate stream permissions after secret update
Re-evaluate permissions, cancel publishers and
subscriptions, send metadata update accordingly.

Move record definitions from the stream reader
to a dedicated header file to be able to write
unit tests.

Fixes #10292
2024-01-19 14:46:57 +01:00
dependabot[bot] 2a94ea06f4
build(deps): bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.41.1 to 2.42.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/maven/2.41.1...lib/2.42.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-15 18:05:48 +00:00
dependabot[bot] bf79c54bce
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.3 to 3.2.5.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.3...surefire-3.2.5)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-10 18:58:00 +00:00
Arnaud Cogoluègnes b4804a8015
Merge pull request #10271 from rabbitmq/fix-stream-topology-error-code
Return error atom according to specification
2024-01-04 10:04:03 +01:00
dependabot[bot] 92fa8f34f3
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.25.0 to 3.25.1.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.25.0...assertj-build-3.25.1)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-03 18:14:32 +00:00
Arnaud Cogoluègnes 6a330563ce
Return error atom according to specification
In stream topology function call. This would
trigger an exception in the frame creation
when the stream was not available because the atom
was unexpected.
2024-01-03 18:15:03 +01:00
Michael Klishin 01092ff31f
(c) year bumps 2024-01-01 22:02:20 -05:00
dependabot[bot] f14b2d3755
build(deps-dev): bump org.assertj:assertj-core
Bumps [org.assertj:assertj-core](https://github.com/assertj/assertj) from 3.24.2 to 3.25.0.
- [Release notes](https://github.com/assertj/assertj/releases)
- [Commits](https://github.com/assertj/assertj/compare/assertj-build-3.24.2...assertj-build-3.25.0)

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-01-01 18:42:24 +00:00
dependabot[bot] ce8532c280
build(deps): bump org.apache.maven.plugins:maven-compiler-plugin
Bumps [org.apache.maven.plugins:maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.12.0 to 3.12.1.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.12.0...maven-compiler-plugin-3.12.1)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-25 18:22:44 +00:00
Arnaud Cogoluègnes 42bdcfb8e4
Add incremental ID for non-dedup stream publishers
The incremental, internal ID is connection-scoped and
embedded in the correlation ID. This allows to detect
late publisher confirms coming from a closed
publisher sharing the same publisher ID (client-set) with
a new publisher.

Such late publisher confirms are just discarded.

Only publishers without a reference (i.e. without deduplication
activated) benefit from this mechanism, as the correlation ID
for publishers with a reference must be an integer, that
Osiris uses for deduplication.

Osiris API may be revised in the future to allow for more flexibility
with the correlation ID used for publishers with a reference.
2023-12-20 11:06:01 +01:00
dependabot[bot] 32700b8d41
build(deps): bump org.apache.maven.plugins:maven-compiler-plugin
Bumps [org.apache.maven.plugins:maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.11.0 to 3.12.0.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.11.0...maven-compiler-plugin-3.12.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-19 18:59:55 +00:00
dependabot[bot] 514f10eb43
build(deps): bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.2 to 3.2.3.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.2...surefire-3.2.3)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-13 18:38:46 +00:00
David Ansari 1e09ed94f4 Fix rabbit_stream_reader API 2023-12-11 15:39:20 +01:00
Michael Klishin e660364e07
Merge pull request #10054 from rabbitmq/dependabot/maven/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/main/com.diffplug.spotless-spotless-maven-plugin-2.41.1
Bump com.diffplug.spotless:spotless-maven-plugin from 2.37.0 to 2.41.1 in /deps/rabbitmq_stream/test/rabbit_stream_SUITE_data
2023-12-05 16:40:42 -05:00
dependabot[bot] 9e21f1f297
Bump com.diffplug.spotless:spotless-maven-plugin
Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.37.0 to 2.41.1.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.37.0...maven/2.41.1)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-05 18:09:22 +00:00
dependabot[bot] 2833384de3
Bump junit.jupiter.version
Bumps `junit.jupiter.version` from 5.9.3 to 5.10.1.

Updates `org.junit.jupiter:junit-jupiter-engine` from 5.9.3 to 5.10.1
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.9.3...r5.10.1)

Updates `org.junit.jupiter:junit-jupiter-params` from 5.9.3 to 5.10.1
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.9.3...r5.10.1)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter-engine
  dependency-type: direct:development
  update-type: version-update:semver-minor
- dependency-name: org.junit.jupiter:junit-jupiter-params
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-05 18:09:17 +00:00
dependabot[bot] af4aa3ef5c
Bump ch.qos.logback:logback-classic
Bumps [ch.qos.logback:logback-classic](https://github.com/qos-ch/logback) from 1.2.12 to 1.2.13.
- [Commits](https://github.com/qos-ch/logback/compare/v_1.2.12...v_1.2.13)

---
updated-dependencies:
- dependency-name: ch.qos.logback:logback-classic
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-12-01 18:49:04 +00:00
Michael Klishin 1b642353ca
Update (c) according to [1]
1. https://investors.broadcom.com/news-releases/news-release-details/broadcom-and-vmware-intend-close-transaction-november-22-2023
2023-11-21 23:18:22 -05:00
Arnaud Cogoluègnes f118d2fdce
Small refactor of stream tests 2023-11-17 10:49:12 +01:00
Arnaud Cogoluègnes 329988c483
Remove syntax test 2023-11-17 09:00:08 +01:00
Arnaud Cogoluègnes 0e5d15592a
Check virtual host queue limit on stream creation
Fixes #9910
2023-11-17 08:45:42 +01:00
Arnaud Cogoluègnes 739928153e
Add --binding-keys to add_super_stream command
It is meant to replace --routing-keys, which
is still available but not documented anymore.

This is to be consistent with AMQP 0.9.1 terminology.
2023-11-15 14:00:07 +01:00
Arnaud Cogoluègnes 5e1155c523
Fix function spec 2023-11-13 16:30:37 +01:00
Arnaud Cogoluègnes 22698a4adc
Simplify maybe block 2023-11-13 15:55:08 +01:00
Arnaud Cogoluègnes 2eb9b5f87b
Make sure partitions and binding keys counts are equal 2023-11-13 15:48:23 +01:00
Arnaud Cogoluègnes 9beda5702e
Use "binding key" lingo for super stream creation
Instead of "routing key".
2023-11-13 14:54:45 +01:00
Arnaud Cogoluègnes 809e97fd38
Use maybe syntax for super stream management permissions 2023-11-13 10:30:26 +01:00
Arnaud Cogoluègnes 043ddcba95
Add super stream creation/deletion test 2023-11-13 10:30:25 +01:00
Arnaud Cogoluègnes d04ccdb0ce
Return correct frame name 2023-11-13 10:30:25 +01:00
Arnaud Cogoluègnes 388608143d
Add super stream add/delete protocol commands 2023-11-13 10:30:25 +01:00
Arnaud Cogoluègnes 15f93f338c
Add super stream creation/deletion commands in spec 2023-11-13 10:30:24 +01:00
dependabot[bot] c3676c6c79
Bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.2.1 to 3.2.2.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.2.1...surefire-3.2.2)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-11-07 18:30:48 +00:00
Iliia Khaprov c577e04b73 Remove POODLE check, we are in the future 2023-11-01 10:53:27 +01:00
David Ansari fb18ddf42c Deduplicate code 2023-10-25 15:58:13 +02:00
dependabot[bot] feaeea2f2e
Bump org.apache.maven.plugins:maven-surefire-plugin
Bumps [org.apache.maven.plugins:maven-surefire-plugin](https://github.com/apache/maven-surefire) from 3.1.2 to 3.2.1.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-3.1.2...surefire-3.2.1)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-10-24 18:22:19 +00:00
Michael Klishin 55bd90c0e4
Merge pull request #9187 from rabbitmq/update-secret
Update secret on the stream protocol
2023-10-23 19:46:52 -04:00
Arnaud Cogoluègnes c04d324b9f
Fix list_stream_consumer_groups validation
It had been broken in a refactoring to squash
dialyzer warnings.
2023-10-10 16:44:05 +02:00
Arnaud Cogoluègnes 65d381ef6c
Polish list_stream_tracking
Correct output when there is no line, fix usage, formatting.
2023-10-10 10:57:48 +02:00
Arnaud Cogoluègnes b541be658e
Use tracking_value for column header 2023-10-10 09:14:24 +02:00
Arnaud Cogoluègnes dc5a63dfbd
Fix command file name 2023-10-10 09:14:24 +02:00
Arnaud Cogoluègnes cfa7c26760
Add command to Bazel configuration 2023-10-10 09:14:23 +02:00
Arnaud Cogoluègnes 048a4998ed
Add list_stream_tracking 2023-10-10 09:14:23 +02:00
Arnaud Cogoluègnes 944b34e4e0 Add pid and node columns to list_stream_connections
Fixes #9555
2023-09-30 14:59:25 -04:00
Arnaud Cogoluègnes db65caa91f Add node column to list_stream_publishers/consumers
Fixes #9582
2023-09-30 14:59:25 -04:00
Arnaud Cogoluègnes 07d7f09a60
Fix a couple for dialyzer warnings 2023-09-28 17:44:55 +02:00
Arnaud Cogoluègnes 289fc51def
Monitor members to the same stream in stream connection
This commit change makes sure that different members
of the same stream are monitored in a given connection.
A connection can use the stream leader for a publisher
and a replica for a consumer, if the replica crashes
for some reason, the connection must detect it and
send a metadata notification to the client.

Note that client connections should still use different
connections for publishers and consumers.
2023-09-28 17:09:29 +02:00
Arnaud Cogoluègnes e22fcd70fe
Make MC conversion function return ok or error 2023-09-18 18:32:59 +02:00
Karl Nilsson 119f034406
Message Containers (#5077)
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.

Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.

This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.

The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.

This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.


COMMIT HISTORY:

* Refactor away from using the delivery{} record

In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.

Add mc module and move direct replies outside of exchange

Lots of changes incl classic queues

Implement stream support incl amqp conversions

simplify mc state record

move mc.erl

mc dlx stuff

recent history exchange

Make tracking work

But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.

Tracing as a whole may want a bit of a re-vamp at some point.

tidy

make quorum queue peek work by legacy conversion

dead lettering fixes

dead lettering fixes

CMQ fixes

rabbit_trace type fixes

fixes

fix

Fix classic queue props

test assertion fix

feature flag and backwards compat

Enable message_container feature flag in some SUITEs

Dialyzer fixes

fixes

fix

test fixes

Various

Manually update a gazelle generated file

until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185

Add message_containers_SUITE to bazel

and regen bazel files with gazelle from rules_erlang@main

Simplify essential proprty access

Such as durable, ttl and priority by extracting them into annotations
at message container init time.

Move type

to remove dependenc on amqp10 stuff in mc.erl

mostly because I don't know how to make bazel do the right thing

add more stuff

Refine routing header stuff

wip

Cosmetics

Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.

* Dedup death queue names

* Fix function clause crashes

Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)

Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.

* Fix is_utf8_no_null crash

Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```

* Implement mqtt mc behaviour

For now via amqp translation.

This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```

* Shorten mc file names

Module name length matters because for each persistent message the #mc{}
record is persisted to disk.

```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```

This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```

* mc: make deaths an annotation + fixes

* Fix mc_mqtt protocol_state callback

* Fix test will_delay_node_restart

```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```

* Bazel run gazelle

* mix format rabbitmqctl.ex

* Ensure ttl annotation is refelected in amqp legacy protocol state

* Fix id access in message store

* Fix rabbit_message_interceptor_SUITE

* dializer fixes

* Fix rabbit:rabbit_message_interceptor_SUITE-mixed

set_annotation/3 should not result in duplicate keys

* Fix MQTT shared_SUITE-mixed

Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.

The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.

* Field content of 'v1_0.data' can be binary

Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
    --test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
    -t- --test_sharding_strategy=disabled
```

* Remove route/2 and implement route/3 for all exchange types.

This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.

stream filtering fixes

* Translate directly from MQTT to AMQP 0.9.1

* handle undecoded properties in mc_compat

amqpl: put clause in right order

recover death deatails from amqp data

* Replace callback init_amqp with convert_from

* Fix return value of lists:keyfind/3

* Translate directly from AMQP 0.9.1 to MQTT

* Fix MQTT payload size

MQTT payload can be a list when converted from AMQP 0.9.1 for example

First conversions tests

Plus some other conversion related fixes.

bazel

bazel

translate amqp 1.0 null to undefined

mc: property/2 and correlation_id/message_id return type tagged values.

To ensure we can support a variety of types better.

The type type tags are AMQP 1.0 flavoured.

fix death recovery

mc_mqtt: impl new api

Add callbacks to allow protocols to compact data before storage

And make readable if needing to query things repeatedly.

bazel fix

* more decoding

* tracking mixed versions compat

* mc: flip default of `durable` annotation to save some data.

Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.

* mc conversion tests and tidy up

* mc make x_header unstrict again

* amqpl: death record fixes

* bazel

* amqp -> amqpl conversion test

* Fix crash in mc_amqp:size/1

Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.

* Fix crash in lists:flatten/1

Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.

* Fix crash in rabbit_writer

Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
  initial call: rabbit_writer:enter_mainloop/2
  pid: <0.711.0>
  registered_name: []
  exception error: bad argument
    in function  size/1
       called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
       *** argument 1: not tuple or binary
    in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
    in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
    in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
    in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
    in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
    in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
    in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.

This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.

* Add accidentally deleted line back

* mc: optimise mc_amqp internal format

By removint the outer records for message and delivery annotations
as well as application properties and footers.

* mc: optimis mc_amqp map_add by using upsert

* mc: refactoring and bug fixes

* mc_SUITE routingheader assertions

* mc remove serialize/1 callback as only used by amqp

* mc_amqp: avoid returning a nested list from protocol_state

* test and bug fix

* move infer_type to mc_util

* mc fixes and additiona assertions

* Support headers exchange routing for MQTT messages

When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).

This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.

When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.

* Fix crash when sending from stream to amqpl

When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
  initial call: rabbit_channel:init/1
  pid: <0.818.0>
  registered_name: []
  exception exit: {{badmatch,undefined},
                   [{rabbit_channel,handle_deliver0,4,
                                    [{file,"rabbit_channel.erl"},
                                     {line,2728}]},
                    {lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
                    {rabbit_channel,handle_cast,2,
                                    [{file,"rabbit_channel.erl"},
                                     {line,728}]},
                    {gen_server2,handle_msg,2,
                                 [{file,"gen_server2.erl"},{line,1056}]},
                    {proc_lib,wake_up,3,
                              [{file,"proc_lib.erl"},{line,251}]}]}
```

This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.

* Support consistent hash exchange routing for MQTT 5.0

When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.

* Convert MQTT 5.0 User Property

* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations

* Make use of Annotations in mc_mqtt:protocol_state/2

mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.

* Enforce AMQP 0.9.1 field name length limit

The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.

* Fix type specs

Apply feedback from Michael Davis

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* Add mc_mqtt unit test suite

Implement mc_mqtt:x_header/2

* Translate indicator that payload is UTF-8 encoded

when converting between MQTT 5.0 and AMQP 1.0

* Translate single amqp-value section from AMQP 1.0 to MQTT

Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.

If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.

This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.

* Fix payload conversion

* Translate Response Topic between MQTT and AMQP

Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.

The Response Topic must be a UTF-8 encoded string.

This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/"     RK        Publish to amq.topic with routing key RK
"/exchange/"  X "/" RK  Publish to exchange X with routing key RK
```

By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.

When an operator modifies the mqtt.exchange, the 2nd target address is
used.

* Apply PR feedback

and fix formatting

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* tidy up

* Add MQTT message_containers test

* consistent hash exchange: avoid amqp legacy conversion

When hashing on a header value.

* Avoid converting to amqp legacy when using exchange federation

* Fix test flake

* test and dialyzer fixes

* dialyzer fix

* Add MQTT protocol interoperability tests

Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams

* Regenerate portions of deps/rabbit/app.bzl with gazelle

I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.

* mc: refactoring

* mc_amqpl: handle delivery annotations

Just in case they are included.

Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
2023-08-31 11:27:13 +01:00
Marcial Rosales b715a4d07c Update secret via sasl_authentication frame 2023-08-25 11:32:24 +01:00
Arnaud Cogoluègnes c594c77049
Adapt stream Java tests to client 0.12.0 snapshot
After changes for
https://github.com/rabbitmq/rabbitmq-stream-java-client/issues/333.
2023-07-17 11:04:25 +02:00
Arnaud Cogoluègnes 6ed8b30780
Merge pull request #8848 from rabbitmq/dependabot/maven/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/main/com.diffplug.spotless-spotless-maven-plugin-2.37.0
Bump spotless-maven-plugin from 2.24.0 to 2.37.0 in /deps/rabbitmq_stream/test/rabbit_stream_SUITE_data
2023-07-13 17:25:10 +02:00
dependabot[bot] dfc2ee65d8
Bump spotless-maven-plugin
Bumps [spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.24.0 to 2.37.0.
- [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md)
- [Commits](https://github.com/diffplug/spotless/compare/lib/2.24.0...lib/2.37.0)

---
updated-dependencies:
- dependency-name: com.diffplug.spotless:spotless-maven-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-07-13 15:17:08 +00:00
dependabot[bot] 18d36662df
Bump junit.jupiter.version
Bumps `junit.jupiter.version` from 5.9.0 to 5.9.3.

Updates `junit-jupiter-engine` from 5.9.0 to 5.9.3
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.9.0...r5.9.3)

Updates `junit-jupiter-params` from 5.9.0 to 5.9.3
- [Release notes](https://github.com/junit-team/junit5/releases)
- [Commits](https://github.com/junit-team/junit5/compare/r5.9.0...r5.9.3)

---
updated-dependencies:
- dependency-name: org.junit.jupiter:junit-jupiter-engine
  dependency-type: direct:development
  update-type: version-update:semver-patch
- dependency-name: org.junit.jupiter:junit-jupiter-params
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-07-13 15:17:02 +00:00
Arnaud Cogoluègnes 742c8ed994
Use stream Java client 0.11.0 in test suites
Just temporarily. This removes the need to use
a snapshot repository, which makes dependabot fail.

There's also a stream Java client update with
internal breaking changes, so we'll come back
to the latest snapshot once the tests are updated.
2023-07-13 17:12:51 +02:00
Arnaud Cogoluègnes 7bc26154b5
Add stream filtering feature flag 2023-07-10 15:21:53 +02:00
Arnaud Cogoluègnes 065844a6d6
Support stream-filter-size-bytes in stream protocol 2023-07-10 15:21:53 +02:00
Arnaud Cogoluègnes 58dc8b9d40
Add test for stream filtering 2023-07-10 15:21:52 +02:00
Arnaud Cogoluègnes 62c83b0b9d
Use "filter."-shaped subscription properties
To set filter values in stream protocol.
2023-07-10 15:21:52 +02:00
Arnaud Cogoluègnes 9f06fb7db9
Add support for stream filtering
WIP.
2023-07-10 15:21:52 +02:00
dependabot[bot] 6f3398a49a
Bump logback-classic
Bumps [logback-classic](https://github.com/qos-ch/logback) from 1.2.11 to 1.2.12.
- [Commits](https://github.com/qos-ch/logback/compare/v_1.2.11...v_1.2.12)

---
updated-dependencies:
- dependency-name: ch.qos.logback:logback-classic
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-30 15:32:16 +00:00
dependabot[bot] 14c43d6c30
Bump maven-compiler-plugin
Bumps [maven-compiler-plugin](https://github.com/apache/maven-compiler-plugin) from 3.10.1 to 3.11.0.
- [Release notes](https://github.com/apache/maven-compiler-plugin/releases)
- [Commits](https://github.com/apache/maven-compiler-plugin/compare/maven-compiler-plugin-3.10.1...maven-compiler-plugin-3.11.0)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-compiler-plugin
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-30 15:07:23 +00:00
Arnaud Cogoluègnes b39088ab87
Merge pull request #8721 from rabbitmq/dependabot/maven/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/main/org.apache.maven.plugins-maven-surefire-plugin-3.1.2
Bump maven-surefire-plugin from 2.22.2 to 3.1.2 in /deps/rabbitmq_stream/test/rabbit_stream_SUITE_data
2023-06-30 17:06:24 +02:00
dependabot[bot] e568bb6f9a
Bump maven-surefire-plugin
Bumps [maven-surefire-plugin](https://github.com/apache/maven-surefire) from 2.22.2 to 3.1.2.
- [Release notes](https://github.com/apache/maven-surefire/releases)
- [Commits](https://github.com/apache/maven-surefire/compare/surefire-2.22.2...surefire-3.1.2)

---
updated-dependencies:
- dependency-name: org.apache.maven.plugins:maven-surefire-plugin
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-30 14:02:53 +00:00
dependabot[bot] 2a19ba07c2
Bump assertj-core in /deps/rabbitmq_stream/test/rabbit_stream_SUITE_data
Bumps assertj-core from 3.23.1 to 3.24.2.

---
updated-dependencies:
- dependency-name: org.assertj:assertj-core
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-06-30 14:02:31 +00:00
Arnaud Cogoluègnes 1315f5c26c
Bump Google Java Format 2023-06-30 10:03:56 +02:00
David Ansari 66fe9630b5 Add Message Expiry Interval for retained messages
MQTT v5 spec:
"If the current retained message for a Topic expires, it is discarded
and there will be no retained message for that topic."

This commit also supports Message Expiry Interval for retained messages
when a node is restarted.
Therefore, the insertion timestamp needs to be stored on disk.
Upon recovery, the Erlang timers are re-created.
2023-06-21 17:14:08 +01:00
Michael Klishin 55442aa914 Replace @rabbitmq.com addresses with rabbitmq-core@groups.vmware.com
Don't ask why we have to do it. Because reasons!
2023-06-20 15:40:13 +04:00
Arnaud Cogoluègnes 6c14d736ab
Polish authentication in stream reader 2023-06-06 13:04:35 +02:00
Arnaud Cogoluègnes 21347490f9
Fix SASL external authentication in stream plugin 2023-06-06 11:08:02 +02:00
Rin Kuryloski eb94a58bc9 Add a workflow to compare the bazel/erlang.mk output
To catch any drift between the builds
2023-05-15 13:54:14 +02:00
Alex Blease 8443c761ee
Include UnsubscribeResponse in stream protocol 2023-05-15 10:44:46 +01:00
Rin Kuryloski a944439fba Replace globs in bazel with explicit lists of files
As this is preferred in rules_erlang 3.9.14
2023-04-25 17:29:12 +02:00
Michal Kuratczyk d04b3afe9b verify_none in a couple of tests 2023-04-24 13:11:44 +00:00
Rin Kuryloski 854d01d9a5 Restore the original -include_lib statements from before #6466
since this broke erlang_ls

requires rules_erlang 3.9.13
2023-04-20 12:40:45 +02:00
Michael Klishin c0ed80c625
Merge pull request #6466 from rabbitmq/gazelle
Use gazelle for some maintenance of bazel BUILD files
2023-04-19 09:33:44 +04:00
Rin Kuryloski 8de8f59d47 Use gazelle generated bazel files
Bazel build files are now maintained primarily with `bazel run
gazelle`. This will analyze and merge changes into the build files as
necessitated by certain code changes (e.g. the introduction of new
modules).

In some cases there hints to gazelle in the build files, such as `#
gazelle:erlang...` or `# keep` comments. xref checks on plugins that
depend on the cli are a good example.
2023-04-17 18:13:18 +02:00
Michael Klishin 0f8a5899de
Merge branch 'main' into otp26-compatibility 2023-04-17 14:23:21 +04:00
Rin Kuryloski 8a7eee6a86 Ignore warnings when building plt files for dependencies
As we don't generally care if a dependency has warnings, only the
target
2023-04-17 10:09:24 +02:00
Michal Kuratczyk dcc24fce1c
Set `{verify, verify_none}` in rabbit_stream_SUITE 2023-04-13 14:37:19 +02:00
Arnaud Cogoluègnes ac20b90a05 Squash Dialyzer warning 2023-04-04 19:32:57 +04:00
Arnaud Cogoluègnes 4a669e1450 Fix field type definition
References #7743
2023-04-04 19:32:57 +04:00
Arnaud Cogoluègnes 763acc2fad Introduce timeout for stream server-side requests
The stream plugin can send frames to a client connection
and expect a response from it. This is used currently
for the consumer_update frame (single active consumer feature).
There was no timeout mechanism so far, so a slow or blocked
application could prevent a group of consumers to move on.

This commit introduces a timeout mechanism: if the expected
response takes too long to arrive, the server assumes the
connection is blocked and closes it.

The default timeout is 60 seconds but it can be changed by setting
the request_timeout parameter of the rabbitmq_stream application.

Note the mechanism does not enforce the exact duration of the timeout,
as a timer is set for the first request and re-used for other requests.
With bad timing, a request can time out after twice as long
as the set-up timeout.

References #7743
2023-04-04 19:32:57 +04:00
Arnaud Cogoluègnes 63d1ed9dfb Add meck to SAC coordinator test dependencies
And squash a dialyzer warning.
2023-04-04 19:32:57 +04:00
Arnaud Cogoluègnes 70538c587f Unblock group of consumers on super stream partition
A group of consumers on a super stream can end up blocked
without an active consumer. This can happen with consumer
churn: one consumer gets removed, which makes the active
consumer passive, but the former active consumer never
gets to know because it has been removed itself.

This commit changes the structure of the messages the SAC
coordinator sends to consumer connections, to embed enough
information to look up the group and to instruct it to choose
a new active consumer when the race condition mentioned above
comes up.

Because of the changes in the structure of messages, a feature
flag is required to make sure the SAC coordinator starts
sending the new messages only when all the nodes have been upgraded.

References #7743
2023-04-04 19:32:57 +04:00
David Ansari b4871e2b83
Update PROTOCOL.adoc 2023-03-21 11:58:50 +01:00
Arnaud Cogoluègnes e9f2fa41ce
Take credits for inactive stream subscription
Not taking the credits can starve the subscription,
making it permanently under its credit send limit.

The subscription then never dispatches messages when
it becomes active again.

This happens in an active-inactive-active cycle, especially
with slow consumers.
2023-03-16 11:59:58 +01:00
David Ansari 4d7403cb62 Delete rabbit_stream_SUITE setup steps
since these feature flags became required in 3.12.
2023-02-15 17:01:59 +00:00
Michael Klishin d0dc951343
Merge pull request #7058 from rabbitmq/add-node-lists-functions-to-clarify-intent
rabbit_nodes: Add list functions to clarify which nodes we are interested in
2023-02-13 23:06:50 -03:00
Michael Klishin 054381a99b
Merge pull request #7269 from rabbitmq/ff-stream_queue
Remove compatibility for feature flag stream_queue
2023-02-13 22:35:08 -03:00
David Ansari 575f4e78bc Remove compatibility for feature flag stream_queue
Remove compatibility code for feature flag `stream_queue`
because this feature flag is required in 3.12.

See #7219
2023-02-13 15:31:40 +00:00
Jean-Sébastien Pédron d65637190a
rabbit_nodes: Add list functions to clarify which nodes we are interested in
So far, we had the following functions to list nodes in a RabbitMQ
cluster:
* `rabbit_mnesia:cluster_nodes/1` to get members of the Mnesia cluster;
  the argument was used to select members (all members or only those
  running Mnesia and participating in the cluster)
* `rabbit_nodes:all/0` to get all members of the Mnesia cluster
* `rabbit_nodes:all_running/0` to get all members who currently run
  Mnesia

Basically:
* `rabbit_nodes:all/0` calls `rabbit_mnesia:cluster_nodes(all)`
* `rabbit_nodes:all_running/0` calls `rabbit_mnesia:cluster_nodes(running)`

We also have:
* `rabbit_node_monitor:alive_nodes/1` which filters the given list of
  nodes to only select those currently running Mnesia
* `rabbit_node_monitor:alive_rabbit_nodes/1` which filters the given
  list of nodes to only select those currently running RabbitMQ

Most of the code uses `rabbit_mnesia:cluster_nodes/1` or the
`rabbit_nodes:all*/0` functions. `rabbit_mnesia:cluster_nodes(running)`
or `rabbit_nodes:all_running/0` is often used as a close approximation
of "all cluster members running RabbitMQ". This list might be incorrect
in times where a node is joining the clustered or is being worked on
(i.e. Mnesia is running but not RabbitMQ).

With Khepri, there won't be the same possible approximation because we
will try to keep Khepri/Ra running even if RabbitMQ is stopped to
expand/shrink the cluster.

So in order to clarify what we want when we query a list of nodes, this
patch introduces the following functions:
* `rabbit_nodes:list_members/0` to get all cluster members, regardless
  of their state
* `rabbit_nodes:list_reachable/0` to get all cluster members we can
  reach using Erlang distribution, regardless of the state of RabbitMQ
* `rabbit_nodes:list_running/0` to get all cluster members who run
  RabbitMQ, regardless of the maintenance state
* `rabbit_nodes:list_serving/0` to get all cluster members who run
  RabbitMQ and are accepting clients

In addition to the list functions, there are the corresponding
`rabbit_nodes:is_*(Node)` checks and `rabbit_nodes:filter_*(Nodes)`
filtering functions.

The code is modified to use these new functions. One possible
significant change is that the new list functions will perform RPC calls
to query the nodes' state, unlike `rabbit_mnesia:cluster_nodes(running)`.
2023-02-13 12:58:40 +01:00
David Ansari 2f76102603 Remove compatibility for flag stream_single_active_consumer
Remove compatibility code for feature flag `stream_single_active_consumer`
because this feature flag is `required` in 3.12.

See https://github.com/rabbitmq/rabbitmq-server/pull/7219
2023-02-13 11:52:25 +00:00
David Ansari 5045fce6de Require all feature flags introduced before 3.11.1
RabbitMQ 3.12 requires feature flag `feature_flags_v2` which got
introduced in 3.11.0 (see
https://github.com/rabbitmq/rabbitmq-server/pull/6810).

Therefore, we can mark all feature flags that got introduced in 3.11.0
or before 3.11.0 as required because users will have to upgrade to
3.11.x first, before upgrading to 3.12.x

The advantage of marking these feature flags as required is that we can
start deleting any compatibliy code for these feature flags, similarly
as done in https://github.com/rabbitmq/rabbitmq-server/issues/5215

This list shows when a given feature flag was first introduced:

```
classic_mirrored_queue_version 3.11.0
stream_single_active_consumer 3.11.0
direct_exchange_routing_v2 3.11.0
listener_records_in_ets 3.11.0
tracking_records_in_ets 3.11.0

empty_basic_get_metric 3.8.10
drop_unroutable_metric 3.8.10
```

In this commit, we also force all required feature flags in Erlang
application `rabbit` to be enabled in mixed version cluster testing
and delete any tests that were about a feature flag starting as disabled.

Furthermore, this commit already deletes the callback (migration) functions
given they do not run anymore in 3.12.x.

All other clean up (i.e. branching depending on whether a feature flag
is enabled) will be done in separate commits.
2023-02-08 16:00:03 +00:00
David Ansari 79c12b60bc Use maybe expression instead of messy patterns
This commit is pure refactoring making the code base more maintainable.

Replace rabbit_misc:pipeline/3 with the new OTP 25 experimental maybe
expression because
"Frequent ways in which people work with sequences of failable
operations include folds over lists of functions, and abusing list
comprehensions. Both patterns have heavy weaknesses that makes them less
than ideal."
https://www.erlang.org/eeps/eep-0049#obsoleting-messy-patterns

Additionally, this commit is more restrictive in the type spec of
rabbit_mqtt_processor state fields.
Specifically, many fields were defined to be `undefined | T` where
`undefined` was only temporarily until the first CONNECT packet was
processed by the processor.
It's better to initialise the MQTT processor upon first CONNECT packet
because there is no point in having a processor without having received
any packet.
This allows many type specs in the processor to change from `undefined |
T` to just `T`.
Additionally, memory is saved by removing the `received_connect_packet`
field from the `rabbit_mqtt_reader` and `rabbit_web_mqtt_handler`.
2023-02-07 16:36:08 +01:00
Alexey Lebedeff c7da0da8b8 Cleanup dialyzer calls
- Use the same base .plt everywhere, so there is no need to list
standard apps everywhere
- Fix typespecs: some typos and the use of not-exported types
2023-02-06 17:05:30 +01:00
Arnaud Cogoluègnes f558d80e3d
Activate stream delivery v2 correctly
As soon as v2 is supported, whatever the min supported
version is.
2023-01-31 16:49:48 +01:00
David Ansari 1f106fcd98 Fix wrong and add missing type specs 2023-01-25 17:13:54 +00:00
David Ansari 9c2f5975ea Support tracing in Native MQTT 2023-01-24 17:32:59 +00:00
David Ansari a8b69b43c1 Fix dialyzer issues and add function specs
Fix all dialyzer warnings in rabbitmq_mqtt and rabbitmq_web_mqtt.

Add more function specs.
2023-01-24 17:32:58 +00:00
David Ansari 56e97a9142 Fix MQTT in management plugin
1. Allow to inspect an (web) MQTT connection.
2. Show MQTT client ID on connection page as part of client_properties.
3. Handle force_event_refresh (when management_plugin gets enabled
   after (web) MQTT connections got created).
4. Reduce code duplication between protocol readers.
5. Display '?' instead of 'NaN' in UI for absent queue metrics.
6. Allow an (web) MQTT connection to be closed via management_plugin.

For 6. this commit takes the same approach as already done for the stream
plugin:
The stream plugin registers neither with {type, network} nor {type,
direct}.
We cannot use gen_server:call/3 anymore to close the connection
because the web MQTT connection cannot handle gen_server calls (only
casts).
Strictly speaking, this commit requires a feature flag to allow to force
closing stream connections from the management plugin during a rolling
update. However, given that this is rather an edge case, and there is a
workaround (connect to the node directly hosting the stream connection),
this commit will not introduce a new feature flag.
2023-01-24 17:30:10 +00:00
Rin Kuryloski 08d641a1a9 Fix dialyzer warnings revealed from previous commit 2023-01-19 17:29:29 +01:00
Rin Kuryloski b84e746ee9 Rework plt/dialyze for rabbitmqctl and plugins that depend on it
This allows us to stop ignorning undefined callback warnings

When mix compiles rabbitmqctl, it produces a 'consolidated' directory
alongside the 'ebin' dir. Some of the modules in consolidated are
intended to be used instead of those provided by elixir. We now handle
the conflicts properly in the bazel build.
2023-01-19 17:29:23 +01:00
Rin Kuryloski 1a9f3f4ffd
Merge pull request #6928 from rabbitmq/rin/use-new-rules_erlang-macros
Simplify BUILD files by using new macros from rules_erlang 3.9.0
2023-01-19 10:03:42 +01:00
Michal Kuratczyk b8691b720b
Merge pull request #6862 from rabbitmq/small-chunks-opts
Move nopush to reader to try to better make use of packets
2023-01-19 09:01:24 +01:00
Rin Kuryloski 5ef8923462 Avoid the need to pass package name to rabbitmq_integration_suite 2023-01-18 15:25:27 +01:00
Rin Kuryloski a317b30807 Use improved assert_suites2 macro from rules_erlang 3.9.0 2023-01-18 15:07:06 +01:00
Karl Nilsson 83880154de Streams: Move nopush to reader to try to combine small chunks into larger IP packets.
Also change consumer credit top-ups to delay calling send_chunks until there is a "batch"
of credit to consume. Most clients at the time of writing send single credit updates after receiving each chunk so here we won't enter the send loop unless there are more than half the initial credits available.

osiris v1.4.3
2023-01-17 14:01:42 +00:00
Alexey Lebedeff 2c4e4fb691 Fix all dialyzer warnings in rabbitmq_stream
There are some elixir-related messages about undefined functions, but
they don't produce warnings (yet).
2023-01-16 17:11:24 +01:00
Arnaud Cogoluègnes 3fea81a7ee
Remove console output 2023-01-04 15:56:49 +01:00
Arnaud Cogoluègnes d2443b5f49
Filter out nodes in maintenance in stream metadata
We don't want clients to try to connect to those.

Fixes #3370
2023-01-04 15:54:20 +01:00
Arnaud Cogoluègnes 79990e8eae
Format stream plugin code 2023-01-04 10:47:16 +01:00
Michael Klishin ec4f1dba7d
(c) year bump: 2022 => 2023 2023-01-01 23:17:36 -05:00
Arnaud Cogoluègnes c1ab6b42d4
Use assertMatch instead of assertEqual in test
Make test less fragile.
2022-11-17 15:52:01 +01:00
Karl Nilsson e4921503db Streams: close osiris log on unsubscribe
To ensure we clean up any log resources such as counter records.
2022-11-03 16:54:31 +00:00
Luke Bakken 7fe159edef
Yolo-replace format strings
Replaces `~s` and `~p` with their unicode-friendly counterparts.

```
git ls-files *.erl | xargs sed -i.ORIG -e s/~s>/~ts/g -e s/~p>/~tp/g
```
2022-10-10 10:32:03 +04:00
Aitor Pérez Cedres 1ce96c0ae5
Update Stream protocol document
As per conversation with @acogoluegnes, the frame `SaslHandshakeRequest` does not have a field `mechanism`. This field actually used in `SaslHandshakeResponse` and `SaslAuthenticateRequest` frames.
2022-10-05 17:28:59 +01:00
Michael Klishin 796902a5a5
Merge pull request #5961 from rabbitmq/rabbitmq-server-5956-stream-route-partitions-response-prefix
Fix keys for route and partitions responses
2022-10-03 17:32:56 +04:00
Arnaud Cogoluègnes c33e4ae3c6
Use leader locator "balanced" by default for add_super_stream
For the CLI, otherwise all the created individual streams end
up with their leader on the same node, whichi is not what we
want. "balanced" is a reasonable default and it can be overridden
anyway.
2022-10-03 11:53:06 +02:00
Arnaud Cogoluègnes 0c1eeab92b
Fix keys for route and partitions responses
These 2 responses were not using the `rabbit_stream_core` utilities
so they were not using the "prefix" for responses.

Fixes #5956
2022-10-03 09:42:40 +02:00
Michal Kuratczyk 2855278034
Migrate from supervisor2 to supervisor 2022-09-27 13:53:06 +02:00
Arnaud Cogoluègnes f92b2a3d7d
Complement rabbitmq-stream.8 with SAC and super streams 2022-09-23 09:57:42 +02:00
Ayanda Dube 767f0c9e4a Rename and fix rabbit_stream_metrics_gc childspec identifier from supervisor context 2022-09-12 23:32:01 +01:00
David Ansari b953b0f10e Stop sending stats to rabbit_event
Stop sending connection_stats from protocol readers to rabbit_event.
Stop sending queue_stats from queues to rabbit_event.
Sending these stats every 5 seconds to the event manager process is
superfluous because noone handles these events.

They seem to be a relict from before rabbit_core_metrics ETS tables got
introduced in 2016.

Delete test head_message_timestamp_statistics because it tests that
head_message_timestamp is set correctly in queue_stats events
although queue_stats events are used nowhere.
The functionality of head_message_timestamp itself is still tested in
deps/rabbit/test/priority_queue_SUITE.erl and
deps/rabbit/test/temp/head_message_timestamp_tests.py
2022-09-09 10:52:38 +00:00
Arnaud Cogoluègnes 20216ef3c1
Remove experimental warning in super stream commands 2022-09-06 15:42:12 +02:00
Arnaud Cogoluègnes fa528115b1
Bump dependencies in stream Java test 2022-08-10 09:56:58 +02:00
Michael Klishin 68969faf94
Streams: adapt tests to the latest Java stream client listener interface 2022-08-10 11:43:44 +04:00
Arnaud Cogoluègnes 26b4e6fa41
Use atom_to_binary/1 instead of rabbit_data_coercion 2022-08-08 18:09:58 +02:00
Arnaud Cogoluègnes 93c33f2423
Rename StreamInfo to StreamStats
Other changes: returns a map of int64, use the new osiris:get_stats/1 API.

References #5412
2022-08-08 18:02:51 +02:00