In some cases we can compute this incrementally with the streaming hash
utilities from crypto:
> Hash0 = crypto:hash_init(sha256),
> Hash1 = crypto:hash_update(Hash0, Data0),
%% ...
> HashN = crypto:hash_update(HashN1, DataN),
> Hash = crypto:hash_final(HashN).
Especially for large bodies this lets us skip a lot of double work.
Currently this is only added to the direct_request API, with the idea
that the other method that blocks the server is deprecated.
Binaries are unnecessarily restrictive since gun allows sending iodata
bodies. This would be useful for larger requests where we want to
combine multiple lists or binaries without paying the cost of
concatenating them. Note that `crypto:hash/2` accepts an `iodata()` arg.
Also this commit avoids double-SHA256-hashing the request body in
`rabbitmq_aws_sign`. The `request_hash/5` function performed a second
`sha256/1` of the body to get the payload hash but this value already
exists at the top of `headers/1`.
Retry unregistering a stream from its group in case of stream
coordinator timeout/unavailability. The operation can fail during or
after a network partition, which is normally, but it is harmless to
retry it to clean up the SAC group. The operation is idempotent anyway.
Sometimes a plugin needs to list online peers
that are running, reachable, not under maintenance
and have a specific plugin enabled.
This commit introduces a few helper functions
to make such cluster member queries trivial.
This commit fixes the following test flake that occurred in CI:
```
make -C deps/rabbit ct-amqp_dotnet t=cluster_size_1:redelivery
```
After receiving the end frame, the server session proc replies with the end frame.
Usually when the test case succeeds, the server connection process receives
a DOWN for the session proc and untracks its channel number such that a
subsequent begin frame for the same channel number will create a new session
proc in the server.
In the flake however, the client receives the end, and pipelines new begin,
attach, and flow frames. These frames are received in the server connection's
mailbox before the monitor for the old session proc fires. That's why these
new frames are sent to the old session proc causing the test case to
fail.
This reveals a bug in the server.
This commit fixes this bug similarly as done in the AMQP 0.9.1 channel in
94b4a6aafd/deps/rabbit/src/rabbit_channel.erl (L1146-L1155)
Channel reuse by the client is valid and actually common, e.g. if channel-max
is 0.
These functions will be used in the child commit for a check on
the number of exchanges. We can use the projection to avoid bothering
the Khepri process with a query.
[Why]
In CI, we sometimes get a failure when we try to forget node 3. The
CLI doesn't report the nature of the error unfortunately.
I suppose it's related to the fact that node 3 is stopped and forgotten
before all three replicas were ready when the stream queue was declared.
This is just a guess though and have no proof that it is the actual
error.
[How]
We wait for the replicas after declaring the stream queue.
[Why]
Sometimes it returns `false` in CI. `meck:validate/1` can return false
in the module throws an exception. So perhaps a timing issue in CI where
the runner is usually slower than our working computers?
[Why]
Sometimes, at least in CI, it looks like the output of the CLI is
prepended with a newline, sometimes not. This breaks the check of that
output.
[How]
We just trim the output before parsing it. The parsing already takes
care of trimming internal whitespaces.
Prior to this commit, the following test case flaked:
```
make -C deps/rabbitmq_mqtt ct-v5 t=cluster_size_1:session_upgrade_v3_v5_qos1
```
The test case failed with:
```
{v5_SUITE,session_upgrade_v3_v5_qos,1112}
{test_case_failed,Received unexpected PUBLISH payload. Expected: <<"2">> Got: <<"1">>}
```
The broker logs showed:
```
2025-07-15 15:50:23.914152+00:00 [debug] <0.758.0> MQTT accepting TCP connection <0.758.0> (127.0.0.1:38594 -> 127.0.0.1:27005)
2025-07-15 15:50:23.914289+00:00 [debug] <0.758.0> Received a CONNECT, client ID: session_upgrade_v3_v5_qos, username: undefined, clean start: false, protocol version: 3, keepalive: 60, property names: []
2025-07-15 15:50:23.914403+00:00 [debug] <0.758.0> MQTT connection 127.0.0.1:38594 -> 127.0.0.1:27005 picked vhost using plugin_configuration_or_default_vhost
2025-07-15 15:50:23.914480+00:00 [debug] <0.758.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2025-07-15 15:50:23.914641+00:00 [info] <0.758.0> Accepted MQTT connection 127.0.0.1:38594 -> 127.0.0.1:27005 for client ID session_upgrade_v3_v5_qos
2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> Received a SUBSCRIBE with subscription(s) [{mqtt_subscription,
2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> <<"session_upgrade_v3_v5_qos">>,
2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> {mqtt_subscription_opts,1,false,
2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> false,0,undefined}}]
2025-07-15 15:50:23.924503+00:00 [debug] <0.764.0> MQTT accepting TCP connection <0.764.0> (127.0.0.1:38608 -> 127.0.0.1:27005)
2025-07-15 15:50:23.924922+00:00 [debug] <0.764.0> Received a CONNECT, client ID: session_upgrade_v3_v5_qos, username: undefined, clean start: false, protocol version: 5, keepalive: 60, property names: []
2025-07-15 15:50:23.925589+00:00 [error] <0.758.0> writing to MQTT socket #Port<0.63> failed: closed
2025-07-15 15:50:23.925635+00:00 [debug] <0.764.0> MQTT connection 127.0.0.1:38608 -> 127.0.0.1:27005 picked vhost using plugin_configuration_or_default_vhost
2025-07-15 15:50:23.925670+00:00 [info] <0.758.0> MQTT connection <<"127.0.0.1:38594 -> 127.0.0.1:27005">> will terminate because peer closed TCP connection
2025-07-15 15:50:23.925727+00:00 [debug] <0.764.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
2025-07-15 15:50:24.000790+00:00 [info] <0.764.0> Accepted MQTT connection 127.0.0.1:38608 -> 127.0.0.1:27005 for client ID session_upgrade_v3_v5_qos
2025-07-15 15:50:24.016553+00:00 [warning] <0.764.0> MQTT disconnecting client <<"127.0.0.1:38608 -> 127.0.0.1:27005">> with client ID 'session_upgrade_v3_v5_qos', reason: normal
```
This shows evidence that the MQTT server connection did not process the
DISCONNECT packet. The hypothesis is that the server connection did not
even process the PUBACK packet from the client. Hence, the first message
got requeued and re-delivered to the new v5 client.
This commit fixes this flake by not acking the first message.
Hence, we always expect that the first message will be redelivered to
the new v5 client.
... when testing vhost limits
[Why]
The tracking is aynchronous, thus the third MQTT connection might be
opened before the tracking is up-to-date, which the testcase doesn't
expect.
[Why]
In the `node_channel_limit` testcase, we open several channels and
verify the count of opened channels in all places but one: after the
first connection failure, when we try to open 3 channels.
Opening 3 channels in a row might not be tracked in time to reject the
third channel because the counter is updated asynchronously.
[How]
We simply wait for the counter to reach 5 before opening the third
channel.
We change all checks to use `?awaitMatch/3` in the process to be more
robust with timing issues.