Retry unregistering a stream from its group in case of stream
coordinator timeout/unavailability. The operation can fail during or
after a network partition, which is normally, but it is harmless to
retry it to clean up the SAC group. The operation is idempotent anyway.
Sometimes a plugin needs to list online peers
that are running, reachable, not under maintenance
and have a specific plugin enabled.
This commit introduces a few helper functions
to make such cluster member queries trivial.
This commit fixes the following test flake that occurred in CI:
```
make -C deps/rabbit ct-amqp_dotnet t=cluster_size_1:redelivery
```
After receiving the end frame, the server session proc replies with the end frame.
Usually when the test case succeeds, the server connection process receives
a DOWN for the session proc and untracks its channel number such that a
subsequent begin frame for the same channel number will create a new session
proc in the server.
In the flake however, the client receives the end, and pipelines new begin,
attach, and flow frames. These frames are received in the server connection's
mailbox before the monitor for the old session proc fires. That's why these
new frames are sent to the old session proc causing the test case to
fail.
This reveals a bug in the server.
This commit fixes this bug similarly as done in the AMQP 0.9.1 channel in
94b4a6aafd/deps/rabbit/src/rabbit_channel.erl (L1146-L1155)
Channel reuse by the client is valid and actually common, e.g. if channel-max
is 0.
These functions will be used in the child commit for a check on
the number of exchanges. We can use the projection to avoid bothering
the Khepri process with a query.
[Why]
In CI, we sometimes get a failure when we try to forget node 3. The
CLI doesn't report the nature of the error unfortunately.
I suppose it's related to the fact that node 3 is stopped and forgotten
before all three replicas were ready when the stream queue was declared.
This is just a guess though and have no proof that it is the actual
error.
[How]
We wait for the replicas after declaring the stream queue.
[Why]
Sometimes it returns `false` in CI. `meck:validate/1` can return false
in the module throws an exception. So perhaps a timing issue in CI where
the runner is usually slower than our working computers?
[Why]
Sometimes, at least in CI, it looks like the output of the CLI is
prepended with a newline, sometimes not. This breaks the check of that
output.
[How]
We just trim the output before parsing it. The parsing already takes
care of trimming internal whitespaces.
Prior to this commit, the following test case flaked:
```
make -C deps/rabbitmq_mqtt ct-v5 t=cluster_size_1:session_upgrade_v3_v5_qos1
```
The test case failed with:
```
{v5_SUITE,session_upgrade_v3_v5_qos,1112}
{test_case_failed,Received unexpected PUBLISH payload. Expected: <<"2">> Got: <<"1">>}
```
The broker logs showed:
```
2025-07-15 15:50:23.914152+00:00 [debug] <0.758.0> MQTT accepting TCP connection <0.758.0> (127.0.0.1:38594 -> 127.0.0.1:27005)
2025-07-15 15:50:23.914289+00:00 [debug] <0.758.0> Received a CONNECT, client ID: session_upgrade_v3_v5_qos, username: undefined, clean start: false, protocol version: 3, keepalive: 60, property names: []
2025-07-15 15:50:23.914403+00:00 [debug] <0.758.0> MQTT connection 127.0.0.1:38594 -> 127.0.0.1:27005 picked vhost using plugin_configuration_or_default_vhost
2025-07-15 15:50:23.914480+00:00 [debug] <0.758.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2025-07-15 15:50:23.914641+00:00 [info] <0.758.0> Accepted MQTT connection 127.0.0.1:38594 -> 127.0.0.1:27005 for client ID session_upgrade_v3_v5_qos
2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> Received a SUBSCRIBE with subscription(s) [{mqtt_subscription,
2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> <<"session_upgrade_v3_v5_qos">>,
2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> {mqtt_subscription_opts,1,false,
2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> false,0,undefined}}]
2025-07-15 15:50:23.924503+00:00 [debug] <0.764.0> MQTT accepting TCP connection <0.764.0> (127.0.0.1:38608 -> 127.0.0.1:27005)
2025-07-15 15:50:23.924922+00:00 [debug] <0.764.0> Received a CONNECT, client ID: session_upgrade_v3_v5_qos, username: undefined, clean start: false, protocol version: 5, keepalive: 60, property names: []
2025-07-15 15:50:23.925589+00:00 [error] <0.758.0> writing to MQTT socket #Port<0.63> failed: closed
2025-07-15 15:50:23.925635+00:00 [debug] <0.764.0> MQTT connection 127.0.0.1:38608 -> 127.0.0.1:27005 picked vhost using plugin_configuration_or_default_vhost
2025-07-15 15:50:23.925670+00:00 [info] <0.758.0> MQTT connection <<"127.0.0.1:38594 -> 127.0.0.1:27005">> will terminate because peer closed TCP connection
2025-07-15 15:50:23.925727+00:00 [debug] <0.764.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2025-07-15 15:50:24.000790+00:00 [info] <0.764.0> Accepted MQTT connection 127.0.0.1:38608 -> 127.0.0.1:27005 for client ID session_upgrade_v3_v5_qos
2025-07-15 15:50:24.016553+00:00 [warning] <0.764.0> MQTT disconnecting client <<"127.0.0.1:38608 -> 127.0.0.1:27005">> with client ID 'session_upgrade_v3_v5_qos', reason: normal
```
This shows evidence that the MQTT server connection did not process the
DISCONNECT packet. The hypothesis is that the server connection did not
even process the PUBACK packet from the client. Hence, the first message
got requeued and re-delivered to the new v5 client.
This commit fixes this flake by not acking the first message.
Hence, we always expect that the first message will be redelivered to
the new v5 client.
... when testing vhost limits
[Why]
The tracking is aynchronous, thus the third MQTT connection might be
opened before the tracking is up-to-date, which the testcase doesn't
expect.
[Why]
In the `node_channel_limit` testcase, we open several channels and
verify the count of opened channels in all places but one: after the
first connection failure, when we try to open 3 channels.
Opening 3 channels in a row might not be tracked in time to reject the
third channel because the counter is updated asynchronously.
[How]
We simply wait for the counter to reach 5 before opening the third
channel.
We change all checks to use `?awaitMatch/3` in the process to be more
robust with timing issues.
[Why]
ehie flaked today since the restart took 309ms, thus above
the allowed 100ms (outside of CI, it takes single-digit ms)
[How]
Increase the allowed time but also significantly increase next_seq_id.
This test exists because in the past we had an O(n) algorithm in CQ
recovery, leading to a slow recovery of even empty queues, if they had
a very large next_seq_id. Now that this operation is O(1), a much larger
next_seq_id shouldn't affect the time it takes to run
this test, while accidentally re-introducing an O(n) algorithm should
fail this test consistently.
[Why]
It looks like `erlang_vm_dist_node_queue_size_bytes` is not always
present, even though other Erlang-specific metrics are present.
[How]
The goal is to ensure Erlang metrics are present in the output, so just
use another one that is likely to be there.
[Why]
The tests relied on `rabbit_ct_client_helpers` connection and channel
manager which doesn't seem to be robust. It causes more harm than helps
so far.
Hopefully, this will fix some test flakes in CI.
[Why]
The tests relied on `rabbit_ct_client_helpers` connection and channel
manager which doesn't seem to be robust. It causes more harm than helps
so far.
Hopefully, this will fix some test flakes in CI.
[Why]
This was the only place where a condition was checked once after a
connection close, instead of waiting for it to become true.
This caused some transient failures in CI when the connection tracking
took a bit of time to update and the check was performed before that.
... in `remove_node_when_seed_node_is_leader/1` and
`remove_node_when_seed_node_is_follower/1`.
[Why]
The check was performed after the partition so far. It was incorrect
because if a cluster change was not permitted at the time of the
partition, it would not be afterwards. Thus there was a race condition
here.
[How]
Now, the check is performed before the partition.
Thanks to this new approach, we are sure of the state of node A and
don't need the cass block near the end of the test cases.
This should fix some test flakes we see locally and in CI.
[Why]
The default checkpoint interval is 16384. Therefore with 20,000
messages published by the testcase, there is a chance a checkpoint is
created. This would hit an assertion in the testcase which expects no
checkpoints before it forces the creation of one. We see this happening
in CI. Not locally because the testcase runs fast enough.
[How]
The testcase now sends 10,000 messages. This is still a lot of messages
while staying under the default checkpoint interval.
[Why]
When `wait_for_messages_ready/3` returns, we are sure that the replicas
are in the expected state. However, the `#amqqueue{}` record is updated
in Khepri, we don't know when all Khepri store members will be
up-to-date.
It can happen that `Server0` is not up-to-date when we query that record
to get the list of replicass, leading to a test failure.
[How]
First, the check is moved to its own function is `queue_utils`.
Then, if Khepri is being used, we use a Khepri fence to ensure previous
operations were applied on the given server. This way, we get a
consistent view of the `#amqqueue{}` record and thus the list of
replicas.