The same connection can contain several consumers belonging to a SAC
group (group key = vhost + stream + consumer name). The whole new group
must be re-evaluated to select a new active consumer after the consumers
of the down connection are removed from it.
The previous behavior would not re-evaluate the new group and could
select a consumer from the down connection, letting the group with only
inactive consumers, as the selected active consumer would never receive
the activation message from the stream SAC coordinator.
This commit fixes this problem by removing the consumers of the down
down connection from the affected groups and then performing the
appropriate operations for the groups to keep on consuming (e.g.
notifying an active consumer that it needs to step down).
References #13372
Prior to this commit, when a client consumed from an unavailable quorum
queue, the following crash occurred:
```
{badmatch,{error,noproc}}
[{rabbit_quorum_queue,consume,3,[{file,\"rabbit_quorum_queue.erl\"},{line,993}]}
```
This commit fixes this bug by returning any error when registering a
quorum queue consumer to rabbit_queue_type.
This commit also refactors errors returned by
rabbit_queue_type:consume/3 to simplify and ensure seperation of
concerns.
For example prior to this commit, the channel did error
formatting specifically for consuming from streams. It's better if
the channel is unaware of what queue type it consumes from and have each
queue type implementation format their own errors.
Bump the timeout for management operations and link attachments from 20s
to 30s. We've seen timeouts in CI.
We bump the poll interval of the `?awaitMatch` macro because CI
sometimes flaked by crashing in
0e803de6dd/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl (L411)
which indicates that the client lib received a response from a previous
request.
To take more frequent checkpoints for large message workload
Lower the min_checkpoint_interval substantially to allow quorum queues
better control over when checkpoints are taken.
Track bytes enqueued in the aux state and suggest a checkpoint after
every 64MB enqueued (this value is scaled according to backlog just
like the indexes condition).
This should help with more timely checkpointing when very large
messages is used.
Try evaluating byte size independently of time window
also increase max size
This commit fixes a bug in the Erlang AMQP 1.0 client.
Prior to this commit, to repro this bug:
1. Send more than 2^16 messages to a queue.
2. Grant more than a total of 2^16 link credit initially (on a single link
or across multiple links) on a single session without any
auto or manual link credit renewal.
The expectation is that thanks to sufficiently granted initial link-credit,
the client will receive all messages.
However, consumption stops after exactly 2^16-1 messages.
That's because the client lib was never sending a flow frame to the server.
So, after the client received all 2^16-1 messages (the initial
incoming-window set by the client), the server's remote-incoming-window
reached 0 causing the server to stop delivering messages.
The expectation is that the client lib automatically handles session
flow control without any manual involvement of the client app.
This commit implements this fix:
* We keep the server's remote-incoming window always large by default as
explained in https://www.rabbitmq.com/blog/2024/09/02/amqp-flow-control#incoming-window
* Hence, the client lib sets its incoming-window to 100,000 initially.
* The client lib tracks its incoming-window decrementing it by 1 for
every transfer it received. (This wasn't done prior to this commit.)
* Whenever this window shrinks below 50,000, the client sends a flow
frame without any link information widening its incoming-window back to 100,000.
* For test cases (maybe later for apps as well), there is a new function
`amqp10_client_session:flow/3`, which allows for a test case to do manual
session flow control. Its API is designed very similar to
`amqp10_client_session:flow_link/4` in that the test can optionally request
the lib to auto widen the session window whenever it falls below a certain threshold.
* RMQ-1263: Check if queue protected from deleted inside rabbit_amqqueue:with_delete
Delayed exchange automatically manages associated Delayed Queue. We don't want users to delete it accidentally.
If queue is indeed protected its removal can be forced by calling with
?INTERNAL_USER as ActingUser.
* RMQ-1263: Correct a type spec of amqqueue:internal_owner/1
* RMQ-1263: Add protected queues test
---------
Co-authored-by: Iliia Khaprov <iliia.khaprov@broadcom.net>
Co-authored-by: Michael Klishin <klishinm@vmware.com>
(cherry picked from commit 97f44adfad6d0d98feb1c3a47de76e72694c19e0)
* Implement rabbitmq-queues leader_health_check command for quorum queues
(cherry picked from commit c26edbef33)
* Tests for rabbitmq-queues leader_health_check command
(cherry picked from commit 6cc03b0009)
* Ensure calling ParentPID in leader health check execution and
reuse and extend formatting API, with amqqueue:to_printable/2
(cherry picked from commit 76d66a1fd7)
* Extend core leader health check tests and update badrpc error handling in cli tests
(cherry picked from commit 857e2a73ca)
* Refactor leader_health_check command validators and ignore vhost arg
(cherry picked from commit 6cf9339e49)
* Update leader_health_check_command description and banner
(cherry picked from commit 96b8bced2d)
* Improve output formatting for healthy leaders and support
silent mode in rabbitmq-queues leader_health_check command
(cherry picked from commit 239a69b404)
* Support global flag to run leader health check for
all queues in all vhosts on local node
(cherry picked from commit 48ba3e161f)
* Return immediately for leader health checks on empty vhosts
(cherry picked from commit 7873737b35)
* Rename leader health check timeout refs
(cherry picked from commit b7dec89b87)
* Update banner message for global leader health check
(cherry picked from commit c7da4d5b24)
* QQ leader-health-check: check_process_limit_safety before spawning leader checks
(cherry picked from commit 17368454c5)
* Log leader health check result in broker logs (if any leaderless queues)
(cherry picked from commit 1084179a2c)
* Ensure check_passed result for leader health internal calls)
(cherry picked from commit 68739a6bd2)
* Extend CLI format output to process check_passed payload
(cherry picked from commit 5f5e9922bd)
* Format leader healthcheck result log and function exports
(cherry picked from commit ebffd7d8a4)
* Change leader_health_check command scope from queues to diagnostics
(cherry picked from commit 663fc9846e)
* Update (c) line year
(cherry picked from commit df82f12a70)
* Rename command to check_for_quorum_queues_without_an_elected_leader
and use across_all_vhosts option for global checks
(cherry picked from commit b2acbae28e)
* Use rabbit_db_queue for qq leader health check lookups
and introduce rabbit_db_queue:get_all_by_type_and_vhost/2.
Update leader health check timeout to 5s and process limit
threshold to 20% of node's process_limit.
(cherry picked from commit 7a8e166ff6)
* Update tests: quorum_queue_SUITE and rabbit_db_queue_SUITE
(cherry picked from commit 9bdb81fd79)
* Fix typo (cli test module)
(cherry picked from commit 615856853a)
* Small refactor - simpler final leader health check result return on function head match
(cherry picked from commit ea07938f3d)
* Clear dialyzer warning & fix type spec
(cherry picked from commit a45aa81bd2)
* Ignore result without strict match to avoid diayzer warning
(cherry picked from commit bb43c0b929)
* 'rabbitmq-diagnostics check_for_quorum_queues_without_an_elected_leader' documentation edits
(cherry picked from commit 845230b0b380a5f5bad4e571a759c10f5cc93b91)
* 'rabbitmq-diagnostics check_for_quorum_queues_without_an_elected_leader' output copywriting
(cherry picked from commit 235f43bad58d3a286faa0377b8778fcbe6f8705d)
* diagnostics check_for_quorum_queues_without_an_elected_leader: behave like a health check w.r.t. error reporting
(cherry picked from commit db7376797581e4716e659fad85ef484cc6f0ea15)
* check_for_quorum_queues_without_an_elected_leader: handle --quiet and --silent
plus simplify function heads.
References #13433.
(cherry picked from commit 7b392315d5e597e5171a0c8196230d92b8ea8e92)
---------
Co-authored-by: Ayanda Dube <adube14@bloomberg.net>
```
make -C deps/rabbit ct-rabbit_stream_queue t=cluster_size_3_parallel_1 RABBITMQ_METADATA_STORE=mnesia
```
flaked prior to this commit locally on Ubuntu with the following error after 11 runs:
```
rabbit_stream_queue_SUITE > cluster_size_3_parallel_1 > consume_from_replica
{error,
{{shutdown,
{server_initiated_close,406,
<<"PRECONDITION_FAILED - stream queue 'consume_from_replica' in vhost '/' does not have a running replica on the local node">>}},
{gen_server,call,
[<0.8365.0>,
{subscribe,
{'basic.consume',0,<<"consume_from_replica">>,
<<"ctag">>,false,false,false,false,
[{<<"x-stream-offset">>,long,0}]},
<0.8151.0>},
infinity]}}}
```
[Why]
During mixed-version testing, the old node might not be able to join or
rejoin a cluster if the other nodes run a newer Khepri machine version.
[How]
The old node is used as the cluster seed node and is never touched
otherwise. Other nodes are restarted or join the cluster later.