New CLI command to trigger a rebalancing in a SAC group and activate a
consumer. This is a last resort solution if all consumers in a group
accidently end up in {connected, waiting} state.
The command re-uses an existing function, which only picks the consumer
that should be active. This means it does not try to "fix" the state
(e.g. removing a disconnected consumer because its node is definitely
gone from the cluster).
Fixes#14055
A boolean status in the stream SAC coordinator is not enough to follow
the evolution of a consumer. For example a former active consumer that
is stepping down can go down before another consumer in the group is
activated, letting the coordinator expect an activation request that
will never arrive, leaving the group without any active consumer.
This commit introduces 3 status: active (formerly "true"), waiting
(formerly "false"), and deactivating. The coordinator will now know when
a deactivating consumer goes down and will trigger a rebalancing to
avoid a stuck group.
This commit also introduces a status related to the connectivity state
of a consumer. The possible values are: connected, disconnected, and
presumed_down. Consumers are by default connected, they can become
disconnected if the coordinator receives a down event with a
noconnection reason, meaning the node of the consumer has been
disconnected from the other nodes. Consumers can become connected again when
their node joins the other nodes again.
Disconnected consumers are still considered part of a group, as they are
expected to come back at some point. For example there is no rebalancing
in a group if the active consumer got disconnected.
The coordinator sets a timer when a disconnection occurs. When the timer
expires, corresponding disconnected consumers pass into the "presumed
down" state. At this point they are no longer considered part of their
respective group and are excluded from rebalancing decision. They are expected
to get removed from the group by the appropriate down event of a
monitor.
So the consumer status is now a tuple, e.g. {connected, active}. Note
this is an implementation detail: only the stream SAC coordinator deals with
the status of stream SAC consumers.
2 new configuration entries are introduced:
* rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the
disconnected-to-forgotten timer.
* rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands
in the coordinator. It used to be a fixed value of 30 seconds. The
default value is still the same. The setting has been introduced to
make integration tests faster.
Fixes#14070
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Has been cancelledDetails
Test (make) / Build and Xref (1.17, 26) (push) Has been cancelledDetails
Test (make) / Build and Xref (1.17, 27) (push) Has been cancelledDetails
Test (make) / Test (1.17, 27, khepri) (push) Has been cancelledDetails
Test (make) / Test (1.17, 27, mnesia) (push) Has been cancelledDetails
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Has been cancelledDetails
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Has been cancelledDetails
Test (make) / Type check (1.17, 27) (push) Has been cancelledDetails
## What?
PR #13971 added a property test that applies the same quorum queue Raft
command on different quorum queue members on different Erlang nodes
ensuring that the state machine ends up in exaclty the same state.
The different Erlang nodes run the **same** Erlang/OTP version however.
This commit adds another property test where the different Erlang nodes
run **different** Erlang/OTP versions.
## Why?
This test allows spotting any non-determinism that could occur when
running quorum queue members in a mixed version cluster, where mixed
version means in our context different Erlang/OTP versions.
## How?
CI runs currently tests with Erlang 27.
This commit starts an Erlang 26 node in docker, specifically for the
`rabbit_fifo_prop_SUITE`.
Test case `two_nodes_different_otp_version` running Erlang 27 then transfers
a few Erlang modules (e.g. module `rabbit_fifo`) to the Erlang 26 node.
The test case then runs the Ra commands on its own node in Erlang 27 and
on the Erlang 26 node in Docker.
By default, this test case is skipped locally.
However, to run this test case locally, simply start an Erlang node as
follows:
```
erl -sname rabbit_fifo_prop@localhost
```
For test case leader_locator_balanced the actual leaders elected were
nodes 1, 3, 1 because they know about machine version 6 while node 2
only knows about machine version 5.
This commit adds a property test that applies the same Ra commands in
the same order on two different Erlang nodes. The state in which both nodes end
up should be exactly the same.
Ideally, the two nodes should run different OTP versions because this
way we could test for any non-determinism across OTP versions.
However, for now, having a test with both nodes having the same OTP
verison is good enough because running this test with rabbit_fifo
machine version 5 fails while machine version 6 succeeds.
This reveales another interesting: The default "undefined" map order can
even be different using different Erlang nodes with the **same** OTP
version.
Test case `tcp_back_pressure_rabbitmq_internal_flow_quorum_queue` succeeds
consistently locally on macOS and fails consistently in CI since 30 May
2025.
CI also shows a test failure instance of `tcp_back_pressure_rabbitmq_internal_flow_classic_queue`, albeit much rearer.
This test case succeeds in CI when using ubuntu-22.04 but fails with ubuntu-24.04.
Even before 30 May 2025, ubuntu-24.04 was used. However the GitHub runner
version was updated from Version: 20250511.1.0 to Version: 20250527.1.0
which presumably started to cause this test to fail.
This hypothesis cannot be validated because the GitHub actions
definitions YAML file doesn't provide a means to configure this version.
File `images/ubuntu/Ubuntu2404-Readme.md` in https://github.com/actions/runner-images/compare/ubuntu24/20250511.1...ubuntu24/20250527.1 shows the diff.
The most notable changes are probably the kernel version change from Kernel Version: 6.11.0-1013-azure to Kernel Version: 6.11.0-1015-azure and some changes to file `images/ubuntu/scripts/build/configure-environment.sh`
There seem to be no RabbitMQ related changes causing this test to fail
because this test also fails with an older RabbitMQ version with the new runner
Version: 20250527.1.0.
Neither `meck` nor `inet:setopts(Socket, [{active, once}])` cause the
test failure because the test also fails with the former
`erlang:suspend_process/1` and `erlang:resume_process/1`.
The test fails due to the following timeout in the writer proc on the
server:
```
** Last message in was {'$gen_cast',
{send_command,<0.760.0>,0,
{'v1_0.transfer',
{uint,3},
{uint,2211},
{binary,<<0,0,8,162>>},
{uint,0},
true,undefined,undefined,undefined,
undefined,undefined,undefined},
<<"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx">>}}
** When Server state == #{pending => 3510,socket => #Port<0.49>,
reader => <0.755.0>,
monitored_sessions => [<0.760.0>],
pending_size => 3510}
** Reason for termination ==
** {{writer,send_failed,timeout},
[{rabbit_amqp_writer,flush,1,
[{file,"src/rabbit_amqp_writer.erl"},{line,250}]},
{rabbit_amqp_writer,handle_cast,2,
[{file,"src/rabbit_amqp_writer.erl"},{line,106}]},
{gen_server,try_handle_cast,3,[{file,"gen_server.erl"},{line,2371}]},
{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,2433}]},
{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,329}]}]}
```
For unknown reasons, even after the CT test case resumes consumption,
the server still times out writing to the socket.
The most important test expectation that is kept in place is that the
server won't send all the messages if the client can't receive fast
enough.
This is mostly the same as the `messages_total` property test but checks
that the Raft indexes in `ra_indexes` are the set of the indexes checked
out by all consumers union any indexes in the `returns` queue. This is
the intended state of `ra_indexes` and failing this condition could
cause bugs that would prevent snapshotting.
The AMQP spec defines:
```
<field name="durable" type="boolean" default="false"/>
```
RabbitMQ 4.0 and 4.1 interpret the durable field as true if not set.
The idea was to favour safety over performance.
This complies with the AMQP spec because the spec allows other target or
node specific defaults for the durable field:
> If the header section is omitted the receiver MUST assume the appropriate
> default values (or the meaning implied by no value being set) for the fields
> within the header unless other target or node specific defaults have otherwise
> been set.
However, some client libraries completely omit the header section if the
app expliclity sets durable=false. This complies with the spec, but it
means that RabbitMQ cannot diffentiate between "client app forgot to set
the durable field" vs "client lib opted in for an optimisation omitting
the header section".
This is problematic with JMS message selectors where JMS apps can filter
on JMSDeliveryMode. To be able to correctly filter on JMSDeliveryMode,
RabbitMQ needs to know whether the JMS app sent the message as
PERSISTENT or NON_PERSISTENT.
Rather than relying on client libs to always send the header section
including the durable field, this commit makes RabbitMQ comply with the
default value for durable in the AMQP spec.
Some client lib maintainers accepted to send the header section, while
other maintainers refused to do so:
https://github.com/Azure/go-amqp/issues/330https://issues.apache.org/jira/browse/QPIDJMS-608
Likely the AMQP spec was designed to omit the header section when
performance is important, as is the case with durable=false. Omitting
the header section means saving a few bytes per message on the wire and
some marshalling and unmarshalling overhead on both client and server.
Therefore, it's better to push the "safe by default" behaviour from the broker
back to the client libs. Client libs should send messages as durable by
default unless the client app expliclity opts in to send messages as
non-durable. This is also what JMS does: By default JMS apps send
messages as PERSISTENT:
> The message producer's default delivery mode is PERSISTENT.
Therefore, this commit also makes the AMQP Erlang client send messages as
durable, by default.
This commit will apply to RabbitMQ 4.2.
It's arguably not a breaking change because in RabbitMQ, message durability
is actually more determined by the queue type the message is sent to rather than the
durable field of the message:
* Quroum queues and streams store messages durably (fsync or replicate)
no matter what the durable field is
* MQTT QoS 0 queues hold messages in memory no matter what the
durable field is
* Classic queues do not fsync even if the durable field is set to true
In addition, the RabbitMQ AMQP Java library introduced in RabbitMQ 4.0 sends messages with
durable=true:
53e3dd6abb/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java (L91)
The tests for selecting messages by JMSDeliveryMode relying on the
behaviour in this commit can be found on the `jms` branch.
At CQ startup variable_queue went through each seqid from 0 to
next_seq_id looking for the first message even if there were no
messages in the queue (no segment files).
In case of a clean shutdown the value next_seq_id is stored in
recovery terms. This value can be utilized by the queue index to
provide better seqid bounds in absence of segment files.
Before this patch starting an empty classic queue with next_seq_id =
100_000_000 used to take about 26 seconds. With this patch it takes
less than 1ms.
[Why]
They were moved from `rabbit` to `rabbit_common` several years ago to
solve an dependency issue because `amqp_client` depended on the file
handle cache. This is not the case anymore.
[How]
The modules are moved back to `rabbit`.
`rabbit_common` doesn't need to depend on `os_mon` anymore. `rabbit`
already depends on it, so no changes needed here.
`include/rabbit_memory.hrl` and some test cases are moved as well to
follow the `vm_memory_monitor` module.
Consumers with a same name, consuming from the same stream should have
the same partition index. This commit adds a check to enforce this rule
and make the subscription fail if it does not comply.
Fixes#13835
[Why]
The `rabbit_khepri` module grew during the work to add Khepri support to
RabbitMQ and while Khepri was itself written. The current code is
therefore unorganized.
[How]
This commit tries to change proxy functions to be close to their Khepri
equivalent.
The module continues to set non-default options for write functions. We
also add the variants that take an option map to be consistent and not
have to deal with that in the future.
Several legacy functions were removed, either because they were no
longer called or because they were replace by a regular Khepri call.