Commit Graph

408 Commits

Author SHA1 Message Date
Arnaud Cogoluègnes 67ac485e74
Use base64 encoding for random binary in test
The random binary contains characters that can make an authentication
test fail. Encoding it in base64 fixes the problem.
2025-09-04 09:35:12 +00:00
Michael Klishin 173876c982
More log message edits #14403 2025-08-19 14:06:15 -04:00
Michael Klishin a9f7bf1fbc
Log virtual host name if updated token lacks the permissions for it #14403 2025-08-19 14:05:31 -04:00
Michael Klishin e7634679d1
More log message edits #14403 2025-08-19 13:58:47 -04:00
Michael Klishin 55d6419bcd
More log message edits #14403 2025-08-19 13:44:50 -04:00
Michael Klishin 147dfb0e70
Log message wording #14402
(cherry picked from commit 987f5519e6)
2025-08-19 13:17:36 -04:00
Arnaud Cogoluègnes ba8745ab4b
Close stream connection if vhost not authorized after secret update 2025-08-19 16:47:47 +02:00
Arnaud Cogoluègnes 02449bd5d3
Close stream connection if secret update fails 2025-08-19 14:58:04 +02:00
Arnaud Cogoluègnes 22a959331b
Use advertised TLS host setting in metadata frame
The rabbitmq_stream.advertised_tls_host setting is not used in the
metadata frame of the stream protocol, even if it is set. This commit
makes sure the setting is used if set.

References rabbitmq/rabbitmq-stream-java-client#803
2025-08-08 12:33:52 +00:00
Arnaud Cogoluègnes 93f0b7860e
Retry stream SAC unregister consumer operation
Retry unregistering a stream from its group in case of stream
coordinator timeout/unavailability. The operation can fail during or
after a network partition, which is normally, but it is harmless to
retry it to clean up the SAC group. The operation is idempotent anyway.
2025-08-06 13:39:11 +00:00
Michal Kuratczyk a5106c6a61
Expose ra counters (#13895)
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 26) (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 27) (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 28) (push) Waiting to run Details
Test (make) / Test (1.18, 28, khepri) (push) Waiting to run Details
Test (make) / Test (1.18, 28, mnesia) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.18, 28, khepri) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.18, 28, mnesia) (push) Waiting to run Details
Test (make) / Type check (1.18, 28) (push) Waiting to run Details
Switch from ra_metrics to ra_counters

* Expose many more metrics (they are also up to date)
* Bump Seshat, Ra, Osiris, Prometheus.erl
* switch from proplists to maps
2025-07-24 10:43:20 +02:00
Michal Kuratczyk 8a05433897
[skip ci] Remove rabbit_log_connection and use LOG_ macros 2025-07-18 08:43:02 +02:00
Michal Kuratczyk 175ba70e8c
[skip ci] Remove rabbit_log and switch to LOG_ macros 2025-07-18 08:42:59 +02:00
Arnaud Cogoluègnes 0d84c8e9a5
Increase timeouts and improve error logging in stream test 2025-07-16 10:22:20 +02:00
Michael Klishin 8d9c1177d5
rabbitmq-stream reset_offset: respect --quiet and --silent 2025-07-01 17:46:51 +02:00
Arnaud Cogoluègnes c50fc90e47
Add rabbitmq-streams reset_offset command
A user can set the stored offset for a stream/reference couple to 0.
This way a consumer can keep the same name and re-attach to the
beginning of a stream.

References https://github.com/rabbitmq/rabbitmq-server/discussions/14124
2025-07-01 17:46:44 +02:00
Arnaud Cogoluègnes 2f048b4b57
Close stream consumer log after stream is deleted or unavailable
References #14127
2025-06-27 10:36:15 +02:00
Arnaud Cogoluègnes 72df6270b2
Mention socket is from stream reader in log message
Test (make) / Build and Xref (1.17, 26) (push) Waiting to run Details
Test (make) / Build and Xref (1.17, 27) (push) Waiting to run Details
Test (make) / Test (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Type check (1.17, 27) (push) Waiting to run Details
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Has been cancelled Details
2025-06-19 15:50:47 +02:00
Arnaud Cogoluègnes 41acc117bd
Add activate_stream_consumer command
New CLI command to trigger a rebalancing in a SAC group and activate a
consumer. This is a last resort solution if all consumers in a group
accidently end up in {connected, waiting} state.

The command re-uses an existing function, which only picks the consumer
that should be active. This means it does not try to "fix" the state
(e.g. removing a disconnected consumer because its node is definitely
gone from the cluster).

Fixes #14055
2025-06-17 11:56:37 +02:00
Arnaud Cogoluègnes 58f4e83c22
Close stream connection in case of unexpected error from SAC coordinator
Calls to the stream SAC coordinator can fail for various reason
(e.g. a timeout because of a network partition). The stream reader does not
take into account what the SAC coordinator returns and moves on even
in case of errors. This can lead to inconsistent state for SAC groups.

This commit changes this behavior by handling unexpected errors from the
SAC coordinator and closing the connection. The client is expected to
reconnect. This is safer than risking inconsistent state.

Fixes #14040
2025-06-17 11:56:37 +02:00
Arnaud Cogoluègnes a9cf049030
Remove only stream subscriptions affected by down stream member
The clean-up of a stream connection state when a stream member goes down can
remove subscriptions not affected by the member. The subscription state is
removed from the connection, but the subscription is not removed from
the SAC state (if the subscription is a SAC), because the subscription member
PID does not match the down member PID.

When the actual member of the subscription goes down, the subscription is no
longer part of the state, so the clean-up does not find the subscription
and does not remove it from the SAC state. This lets a ghost consumer in
the corresponding SAC group.

This commit makes sure only the affected subscriptions are removed from
the state when a stream member goes down.

Fixes #13961
2025-06-17 11:56:36 +02:00
Arnaud Cogoluègnes d1aab61566
Prevent blocked groups in stream SAC with fine-grained status
A boolean status in the stream SAC coordinator is not enough to follow
the evolution of a consumer. For example a former active consumer that
is stepping down can go down before another consumer in the group is
activated, letting the coordinator expect an activation request that
will never arrive, leaving the group without any active consumer.

This commit introduces 3 status: active (formerly "true"), waiting
(formerly "false"), and deactivating. The coordinator will now know when
a deactivating consumer goes down and will trigger a rebalancing to
avoid a stuck group.

This commit also introduces a status related to the connectivity state
of a consumer. The possible values are: connected, disconnected, and
presumed_down. Consumers are by default connected, they can become
disconnected if the coordinator receives a down event with a
noconnection reason, meaning the node of the consumer has been
disconnected from the other nodes. Consumers can become connected again when
their node joins the other nodes again.

Disconnected consumers are still considered part of a group, as they are
expected to come back at some point. For example there is no rebalancing
in a group if the active consumer got disconnected.

The coordinator sets a timer when a disconnection occurs. When the timer
expires, corresponding disconnected consumers pass into the "presumed
down" state. At this point they are no longer considered part of their
respective group and are excluded from rebalancing decision. They are expected
to get removed from the group by the appropriate down event of a
monitor.

So the consumer status is now a tuple, e.g. {connected, active}. Note
this is an implementation detail: only the stream SAC coordinator deals with
the status of stream SAC consumers.

2 new configuration entries are introduced:
 * rabbit.stream_sac_disconnected_timeout: this is the duration in ms of the
   disconnected-to-forgotten timer.
 * rabbit.stream_cmd_timeout: this is the timeout in ms to apply RA commands
   in the coordinator. It used to be a fixed value of 30 seconds. The
   default value is still the same. The setting has been introduced to
   make integration tests faster.

Fixes #14070
2025-06-17 11:56:20 +02:00
Arnaud Cogoluègnes 13e8564238
Return error if stream leader is undefined in stream manager
A stream may not have a leader temporarily for several reasons, e.g.
after it has been restarted. The stream manager may return undefined in
this case. Some client code may crash because it expects a PID or an
error, but not undefined.

This commit makes sure the leader PID is an actual Erlang PID and
returns {error, not_available} if it is not.

References #13962
2025-06-16 16:38:29 +02:00
Arnaud Cogoluègnes 52c89ab7a3
Always emit consumer_deleted event when stream consumer goes away
Not only when it is removed explicitly by the client. This is necessary
to make sure the consumer record is removed from the management ETS
tables (consumer_stats) and to avoid ghost consumers.

For other protocols like AMQP 091, the consumer_status ETS table is
cleaned up when a channel goes down, but there is no channel concept in
the stream protocol.

This is not consistent with other protocols or queue implementations
(which emits the event only on explicit consumer cancellation)
but is necessary to clean up stats correctly.

References #13092
2025-05-28 17:10:50 +02:00
Arnaud Cogoluègnes cad8b70ee8
Fix partition index conflict in stream SAC coordinator
Consumers with a same name, consuming from the same stream should have
the same partition index. This commit adds a check to enforce this rule
and make the subscription fail if it does not comply.

Fixes #13835
2025-05-06 16:11:46 +02:00
Diana Parra Corbacho ef09b190ce Mgmt UI: Add super streams page 2025-05-04 19:53:55 +02:00
Michal Kuratczyk 09ed8fdc07
Ignore stream connections in unexpected states
A connection which terminated before it was fully established
would lead to a function_clause, since metadata is not available
to really call notify_connection_closed. We can just ignore such
connections and not notify about them.

Resolves https://github.com/rabbitmq/rabbitmq-server/discussions/13670
2025-04-02 23:38:55 +02:00
Arnaud Cogoluègnes 602b6acd7d
Re-evaluate stream SAC group after connection down event
The same connection can contain several consumers belonging to a SAC
group (group key = vhost + stream + consumer name). The whole new group
must be re-evaluated to select a new active consumer after the consumers
of the down connection are removed from it.

The previous behavior would not re-evaluate the new group and could
select a consumer from the down connection, letting the group with only
inactive consumers, as the selected active consumer would never receive
the activation message from the stream SAC coordinator.

This commit fixes this problem by removing the consumers of the down
down connection from the affected groups and then performing the
appropriate operations for the groups to keep on consuming (e.g.
notifying an active consumer that it needs to step down).

References #13372
2025-03-31 14:59:59 +02:00
Arnaud Cogoluègnes 31a4d611f1
Emit events on stream consume and cancel 2025-01-20 17:24:55 +01:00
Arnaud Cogoluègnes 69d0382dd2
Emit cancellation event only when stream consumer is cancelled
Not when the channel or the connection is closed.

References #13085, #9356
2025-01-17 14:42:40 +01:00
Arnaud Cogoluègnes 415dc81655
Ignore CLI info calls during stream connection initialization (#13049)
The connection cannot return some information while initializing, so we
just return no information.

The CLI info call was supported only in the open gen_statem callback, so
such a call during the connection init would make it crash. This can
happen when several stream connections get closed and the user calls
list_stream_consumers or list_stream_connections while the connection
are recovering.

This commit adds a clause for CLI info calls in the all the gen_statem
callbacks and returns actual information only when appropriate.
2025-01-15 17:07:04 +01:00
Michael Klishin 968eefa1bb
Bump (c) line year
There are no functional changes to this massive diff.
2025-01-01 17:54:10 -05:00
Arnaud Cogoluègnes 26f941b815
Squash dialyzer warning 2024-11-07 18:17:47 +01:00
Arnaud Cogoluègnes 1554b74fc7
Use 4-space indent in stream manager 2024-11-07 17:03:50 +01:00
Arnaud Cogoluègnes 5107fd48ba
Remove gen_server behaviour from stream manager
The stream manager does not need to be a gen_server (no cast, no state)
and the gen_server can create contention for large stream deployments
(some functions make cluster-wide calls that can take some time).
2024-11-07 16:50:31 +01:00
Arnaud Cogoluègnes 966e06f2f7
Use inner module function to increment stream protocol counter
Reduce duplication.
2024-10-14 09:53:48 +02:00
Arnaud Cogoluègnes affdeb3125
Use macro for stream publisher/consumer reference check guard
References #12499
2024-10-14 09:24:57 +02:00
Arnaud Cogoluègnes 622dec011d
Return error if store offset reference is longer than 255 characters 2024-10-11 14:55:44 +02:00
Arnaud Cogoluègnes 0260862a27
Return error if stream consumer reference is longer than 255 characters 2024-10-11 11:29:09 +02:00
Arnaud Cogoluègnes 4e8fb46bbf
Return error if stream publisher reference is longer than 255 characters
Fixes #12499
2024-10-11 10:34:45 +02:00
Michal Kuratczyk f0f7500f6a
Revert "Log errors from `ranch:handshake`" (#12304)
This reverts commit 620fff22f1.

It intoduced a regression in another area - a TCP health check,
such as the default (with cluster-operator) readinessProbe,
on a TLS-enabled instance would log a `rabbit_reader` crash
every few seconds:
```
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>   crasher:
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     initial call: rabbit_reader:init/3
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     pid: <0.999.0>
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     registered_name: []
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>     exception error: no match of right hand side value {error, handshake_failed}
tls-server-0 rabbitmq 2024-09-13 09:03:13.010115+00:00 [error] <0.999.0>       in function  rabbit_reader:init/3 (rabbit_reader.erl, line 171)
```
2024-09-13 17:07:57 +02:00
Péter Gömöri 065395e9b8 Hibernate 2 metrics gc processes
It was observed that `rabbit_core_metrics_gc` and
`rabbit_stream_metrics_gc` processes can grow to several MBs of
memory (probably because fetching the list of all queues). As they
execute infrequently (every 2 minutes by default) it can save some
memory to hibernate them in-between (similar to other similar
processes).
2024-08-28 19:41:27 +02:00
Arnaud Cogoluègnes c9951ec1f4
Close stream connection with delay in case of authentication failure
For consistency with other protocols (to protect from potential DoS
attacks). Wrong credentials and virtual host access errors trigger
the delay.
2024-07-25 17:00:36 +02:00
Michael Davis 52a0d70e15
Handle database timeouts when declaring exchanges
The spec of `rabbit_exchange:declare/7` needs to be updated to return
`{ok, Exchange} | {error, Reason}` instead of the old return value of
`rabbit_types:exchange()`. This is safe to do since `declare/7` is not
called by RPC - from the CLI or otherwise - outside of test suites, and
in test suites only through the CLI's `TestHelper.declare_exchange/7`.
Callers of this helper are updated in this commit.

Otherwise this commit updates callers to unwrap the `{ok, Exchange}`
and bubble up errors.
2024-07-22 16:02:03 -04:00
Michael Davis e7489d2cb7
Handle database failures when deleting exchanges
A common case for exchange deletion is that callers want the deletion
to be idempotent: they treat the `ok` and `{error, not_found}` returns
from `rabbit_exchange:delete/3` the same way. To simplify these
callsites we add a `rabbit_exchange:ensure_deleted/3` that wraps
`rabbit_exchange:delete/3` and returns `ok` when the exchange did not
exist. Part of this commit is to update callsites to use this helper.

The other part is to handle the `rabbit_khepri:timeout()` error possible
when Khepri is in a minority. For most callsites this is just a matter
of adding a branch to their `case` clauses and an appropriate error and
message.
2024-07-22 15:59:55 -04:00
Michael Davis f1be7bacc2
Handle database failures when adding/removing bindings
This ensures that the call graph of `rabbit_db_binding:create/2` and
`rabbit_db_binding:delete/2` handle the `{error, timeout}` error
possible when Khepri is in a minority.
2024-07-22 14:16:39 -04:00
David Ansari 18e8c1d5f8 Require all stable feature flags added up to 3.13.0
Since feature flag `message_containers` introduced in 3.13.0 is required in 4.0,
we can also require all other feature flags introduced in or before 3.13.0
and remove their compatibility code for 4.0:

* restart_streams
* stream_sac_coordinator_unblock_group
* stream_filtering
* stream_update_config_command
2024-07-11 11:20:26 +02:00
Simon Unge 19a751890c Remove checks to vhost-limit as that is now handled by rabbit_queue_type:declare
Add new error return tuple when queue limit is exceed
2024-05-27 09:53:54 +02:00
Luke Bakken 620fff22f1
Log errors from `ranch:handshake`
Fixes #11171

An MQTT user encountered TLS handshake timeouts with their IoT device,
and the actual error from `ssl:handshake` / `ranch:handshake` was not
caught and logged.

At this time, `ranch` uses `exit(normal)` in the case of timeouts, but
that should change in the future
(https://github.com/ninenines/ranch/issues/336)
2024-05-06 08:24:38 -07:00
David Ansari f6d4fc2e72 Print more logs when stream connection fails to open
This commit will print
```
[debug] <0.725.0> Transitioned from tcp_connected to peer_properties_exchanged
[debug] <0.725.0> Transitioned from peer_properties_exchanged to authenticating
[debug] <0.725.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
[debug] <0.725.0> Transitioned from authenticating to authenticated
[debug] <0.725.0> Tuning response 1048576 0
[debug] <0.725.0> Open frame received for fakevhost
[warning] <0.725.0> Opening connection failed: access to vhost 'fakevhost' refused for user 'guest'
[debug] <0.725.0> Transitioned from tuning to failure
[warning] <0.725.0> Closing socket #Port<0.48>. Invalid transition from tuning to failure.
[debug] <0.725.0> rabbit_stream_reader terminating in state 'tuning' with reason 'normal'
```
when the user doesn't have access to the vhost.
2024-04-15 16:02:26 +02:00