The stream plugin can send frames to a client connection
and expect a response from it. This is used currently
for the consumer_update frame (single active consumer feature).
There was no timeout mechanism so far, so a slow or blocked
application could prevent a group of consumers to move on.
This commit introduces a timeout mechanism: if the expected
response takes too long to arrive, the server assumes the
connection is blocked and closes it.
The default timeout is 60 seconds but it can be changed by setting
the request_timeout parameter of the rabbitmq_stream application.
Note the mechanism does not enforce the exact duration of the timeout,
as a timer is set for the first request and re-used for other requests.
With bad timing, a request can time out after twice as long
as the set-up timeout.
References #7743
A group of consumers on a super stream can end up blocked
without an active consumer. This can happen with consumer
churn: one consumer gets removed, which makes the active
consumer passive, but the former active consumer never
gets to know because it has been removed itself.
This commit changes the structure of the messages the SAC
coordinator sends to consumer connections, to embed enough
information to look up the group and to instruct it to choose
a new active consumer when the race condition mentioned above
comes up.
Because of the changes in the structure of messages, a feature
flag is required to make sure the SAC coordinator starts
sending the new messages only when all the nodes have been upgraded.
References #7743
Not taking the credits can starve the subscription,
making it permanently under its credit send limit.
The subscription then never dispatches messages when
it becomes active again.
This happens in an active-inactive-active cycle, especially
with slow consumers.
So far, we had the following functions to list nodes in a RabbitMQ
cluster:
* `rabbit_mnesia:cluster_nodes/1` to get members of the Mnesia cluster;
the argument was used to select members (all members or only those
running Mnesia and participating in the cluster)
* `rabbit_nodes:all/0` to get all members of the Mnesia cluster
* `rabbit_nodes:all_running/0` to get all members who currently run
Mnesia
Basically:
* `rabbit_nodes:all/0` calls `rabbit_mnesia:cluster_nodes(all)`
* `rabbit_nodes:all_running/0` calls `rabbit_mnesia:cluster_nodes(running)`
We also have:
* `rabbit_node_monitor:alive_nodes/1` which filters the given list of
nodes to only select those currently running Mnesia
* `rabbit_node_monitor:alive_rabbit_nodes/1` which filters the given
list of nodes to only select those currently running RabbitMQ
Most of the code uses `rabbit_mnesia:cluster_nodes/1` or the
`rabbit_nodes:all*/0` functions. `rabbit_mnesia:cluster_nodes(running)`
or `rabbit_nodes:all_running/0` is often used as a close approximation
of "all cluster members running RabbitMQ". This list might be incorrect
in times where a node is joining the clustered or is being worked on
(i.e. Mnesia is running but not RabbitMQ).
With Khepri, there won't be the same possible approximation because we
will try to keep Khepri/Ra running even if RabbitMQ is stopped to
expand/shrink the cluster.
So in order to clarify what we want when we query a list of nodes, this
patch introduces the following functions:
* `rabbit_nodes:list_members/0` to get all cluster members, regardless
of their state
* `rabbit_nodes:list_reachable/0` to get all cluster members we can
reach using Erlang distribution, regardless of the state of RabbitMQ
* `rabbit_nodes:list_running/0` to get all cluster members who run
RabbitMQ, regardless of the maintenance state
* `rabbit_nodes:list_serving/0` to get all cluster members who run
RabbitMQ and are accepting clients
In addition to the list functions, there are the corresponding
`rabbit_nodes:is_*(Node)` checks and `rabbit_nodes:filter_*(Nodes)`
filtering functions.
The code is modified to use these new functions. One possible
significant change is that the new list functions will perform RPC calls
to query the nodes' state, unlike `rabbit_mnesia:cluster_nodes(running)`.
RabbitMQ 3.12 requires feature flag `feature_flags_v2` which got
introduced in 3.11.0 (see
https://github.com/rabbitmq/rabbitmq-server/pull/6810).
Therefore, we can mark all feature flags that got introduced in 3.11.0
or before 3.11.0 as required because users will have to upgrade to
3.11.x first, before upgrading to 3.12.x
The advantage of marking these feature flags as required is that we can
start deleting any compatibliy code for these feature flags, similarly
as done in https://github.com/rabbitmq/rabbitmq-server/issues/5215
This list shows when a given feature flag was first introduced:
```
classic_mirrored_queue_version 3.11.0
stream_single_active_consumer 3.11.0
direct_exchange_routing_v2 3.11.0
listener_records_in_ets 3.11.0
tracking_records_in_ets 3.11.0
empty_basic_get_metric 3.8.10
drop_unroutable_metric 3.8.10
```
In this commit, we also force all required feature flags in Erlang
application `rabbit` to be enabled in mixed version cluster testing
and delete any tests that were about a feature flag starting as disabled.
Furthermore, this commit already deletes the callback (migration) functions
given they do not run anymore in 3.12.x.
All other clean up (i.e. branching depending on whether a feature flag
is enabled) will be done in separate commits.
This commit is pure refactoring making the code base more maintainable.
Replace rabbit_misc:pipeline/3 with the new OTP 25 experimental maybe
expression because
"Frequent ways in which people work with sequences of failable
operations include folds over lists of functions, and abusing list
comprehensions. Both patterns have heavy weaknesses that makes them less
than ideal."
https://www.erlang.org/eeps/eep-0049#obsoleting-messy-patterns
Additionally, this commit is more restrictive in the type spec of
rabbit_mqtt_processor state fields.
Specifically, many fields were defined to be `undefined | T` where
`undefined` was only temporarily until the first CONNECT packet was
processed by the processor.
It's better to initialise the MQTT processor upon first CONNECT packet
because there is no point in having a processor without having received
any packet.
This allows many type specs in the processor to change from `undefined |
T` to just `T`.
Additionally, memory is saved by removing the `received_connect_packet`
field from the `rabbit_mqtt_reader` and `rabbit_web_mqtt_handler`.
- Use the same base .plt everywhere, so there is no need to list
standard apps everywhere
- Fix typespecs: some typos and the use of not-exported types
1. Allow to inspect an (web) MQTT connection.
2. Show MQTT client ID on connection page as part of client_properties.
3. Handle force_event_refresh (when management_plugin gets enabled
after (web) MQTT connections got created).
4. Reduce code duplication between protocol readers.
5. Display '?' instead of 'NaN' in UI for absent queue metrics.
6. Allow an (web) MQTT connection to be closed via management_plugin.
For 6. this commit takes the same approach as already done for the stream
plugin:
The stream plugin registers neither with {type, network} nor {type,
direct}.
We cannot use gen_server:call/3 anymore to close the connection
because the web MQTT connection cannot handle gen_server calls (only
casts).
Strictly speaking, this commit requires a feature flag to allow to force
closing stream connections from the management plugin during a rolling
update. However, given that this is rather an edge case, and there is a
workaround (connect to the node directly hosting the stream connection),
this commit will not introduce a new feature flag.
This allows us to stop ignorning undefined callback warnings
When mix compiles rabbitmqctl, it produces a 'consolidated' directory
alongside the 'ebin' dir. Some of the modules in consolidated are
intended to be used instead of those provided by elixir. We now handle
the conflicts properly in the bazel build.
Also change consumer credit top-ups to delay calling send_chunks until there is a "batch"
of credit to consume. Most clients at the time of writing send single credit updates after receiving each chunk so here we won't enter the send loop unless there are more than half the initial credits available.
osiris v1.4.3
As per conversation with @acogoluegnes, the frame `SaslHandshakeRequest` does not have a field `mechanism`. This field actually used in `SaslHandshakeResponse` and `SaslAuthenticateRequest` frames.
For the CLI, otherwise all the created individual streams end
up with their leader on the same node, whichi is not what we
want. "balanced" is a reasonable default and it can be overridden
anyway.
Stop sending connection_stats from protocol readers to rabbit_event.
Stop sending queue_stats from queues to rabbit_event.
Sending these stats every 5 seconds to the event manager process is
superfluous because noone handles these events.
They seem to be a relict from before rabbit_core_metrics ETS tables got
introduced in 2016.
Delete test head_message_timestamp_statistics because it tests that
head_message_timestamp is set correctly in queue_stats events
although queue_stats events are used nowhere.
The functionality of head_message_timestamp itself is still tested in
deps/rabbit/test/priority_queue_SUITE.erl and
deps/rabbit/test/temp/head_message_timestamp_tests.py
`rabbit_ff_registry` is an internal module of the feature flags
subsystem, it is not meant to be called outside of it. In particular, it
may lead to crashes very difficult to debug when a feature flag is
enabled. The reason is that this module is compiled and reloaded at
runtime, so the module may disappear for a small period of time. Another
reason is that a process calling `rabbit_ff_registry` may hold that
module, preventing the feature flags subsystem from performing its task.
The correct public API to query the state of a feature flag is
`rabbit_feature_flags:is_enabled/1`. It will always return a boolean. If
the feature flag being queried is in `state_changing` state, this
function takes care of waiting for the outcome.
See #5018.
This line gets logged when the client closes the connection to the
stream port before it authenticates successfully.
Some external load balancers for example connect to the stream port to
do health checks without sending any stream protocol frame.
This commits prevents the RabbitMQ log from being polluted.
Also rework elixir dependency handling, so we no longer rely on mix to
fetch the rabbitmq_cli deps
Also:
- Specify ra version with a commit rather than a branch
- Fixup compilation options for erlang 23
- Add missing ra reference in MODULE.bazel
- Add missing flag in oci.yaml
- Reduce bazel rbe jobs to try to save memory
- Use bazel built erlang for erlang git master tests
- Use the same cache for all the workflows but windows
- Avoid using `mix local.hex --force` in elixir rules
- Fetching seems blocked in CI, and this should reduce hex api usage in
all builds, which is always nice
- Remove xref and dialyze tags since rules_erlang 3 includes them in
the defaults
Some tools such as nvim + erlang_ls sometimes change the cwd if
they encounter a rebar.config. Here we move all rebar.config files
into the root of the project to avoid this and also have a single
point for formatting configuration.
Single active consumer must have a name, which is used as the reference
for storing offsets and as the name of the group the consumer belongs
to in case the stream is a partition of a super stream.
References #3753
A formerly active consumer can have in-flight credit
requests when it becomes inactive. This commit checks
the state of consumer on credit requests and make sure
not to dispatch messages if it's inactive.
Stream consumers can be active or not with SAC, so these 2 fields
are added to the stream metrics. This is the same as with
regular consumers.
References #3753
Separate logic between single SAC and SAC in a partition of a super
stream (makes code clearer), wait for former active consumer
notification to select the new active consumer (avoids a lock
of some sort on the group, so consumers can come and go).
References #3753
In SAC coordinator. The messages to connections cannot be sent
from the state machine or they will be sent also during the
replication. Only the leader enforces side effects.
References #3753
The SAC group coordination should happen as part of raft state
machine, to make it more reliable and robust.
It would also benefit from different "services" the coordinator
and RA provide.
References #3753
Deprecate queue-leader-locator values 'random' and 'least-leaders'.
Both become value 'balanced'.
From now on only queue-leader-locator values 'client-local', and
'balanced' should be set.
'balanced' will place the leader on the node with the least leaders if
there are few queues and will select a random leader if there are many
queues.
This avoid expensive least leaders calculation if there are many queues.
This change also allows us to change the implementation of 'balanced' in
the future. For example 'balanced' could place a leader on a node
depending on resource usage or available node resources.
There is no need to expose implementation details like 'random' or
'least-leaders' as configuration to users.
rabbitmq_cli uses some private rules_erlang apis that have changed in
the upcoming release
Additionally:
- Avoid including both standard and test versions of amqp_client in
integration test suites
- Eliminate most of the compilation order hints (explicit first_srcs)
in the bazel build
- Fix an include statement - in bazel, an app is not available to
itself as a library at compilation time
Several listeners for the same offset can be registered,
which causes problems for low rate, where consumers hit
the end of the stream often. Then Osiris messages accumulate
in the process mailbox, which slows it down considerably after
some time.
This commit makes the listener registration idempotent for a given
offset.
A stream may not be available or in an inconsistent state and the
stream coordinator reports this with an error that the stream
manager handles like an appropriate response. The stream protocol
adapter then tries to extract the topology from the error.
This commit makes the stream manager handles the error correctly
so that the stream protocol adapter reports the unavailability
of the stream to the client.
This is useful in case new fields are needed in further
versions. A new version node can ask for new fields to an
old version node in a mixed-version cluster.
If the old version node returns a known
value for unknown fields instead of failing, the new
node can set up appropriate default value for these
fields in the result of the CLI commands.
bazel-erlang has been renamed rules_erlang. v2 is a substantial
refactor that brings Windows support. While this alone isn't enough to
run all rabbitmq-server suites on windows, one can at least now start
the broker (bazel run broker) and run the tests that do not start a
background broker process
Only a couple of fields of the stream consumer record change
very frequently (credits and Osiris log reference), so this commit
introduces a nested record in the main consumer record that
contains the immutable fields. This potentially avoids producing
a lot of garbage, especially when the consumer state contains
several properties (consumer name, or single active consumer information
in the future).
Fixes#3841
The most recent description of Osiris chunk format does not reference
the timestamp field to be "posix-ish" anymore. This was bit misleading
as it is Erlang's system time.
Add link to Erlang system time documentation to the subscription command
description to avoid confusion about the timestamp field.
Technically duplicate names is supported by common test, but we have
seen it contribute to flakiness in our suite in practice
(cherry picked from commit 513446b6d1)
In test suite. Note a snapshot for 1.0-SNAPSHOT has been
pushed by mistake a while ago, so this one must be
excluded. It's unlikely it will be erased, as the snapshots
for the first stable version should be 1.0.0-SNAPSHOT.
The protocol documentation uses decimal values for error and request key
codes.
Let's use hex values instead. This helps when looking at a request and
its response - 0x0006 and 0x8006 vs. 6 and 32774.
Also, when looking at output of protocol analysis tools like Wireshark,
a hexadecimal value will be printed, for example:
"Nov 1, 2021 23:05:19.395825508 GMT","60216,5552","00000009000600010000000701"
"Nov 1, 2021 23:05:19.396069528 GMT","5552,60216","0000000a80060001000000070001"
Above, we can visually identify delete publisher request and response
(0x0006 and 0x8006) and easily match them in the documentation of the
protocol.
Finally, above argument applies to logging as it is common to log
hex values, not decimal.
1. Response for publisher declaration request does not contain
publisher id.
2. Add mechanism entry to the details of SASL handshake request.
3. SASL handshake response contains list of mechanisms, not just single
mechanism.
Use case: Allow plain connections over one (internal IP), and TLS
connections over another IP (eg. internet routable IP). Without this
patch a cluster can only support access over one or the other IP, not
both.
(cherry picked from commit b9e6aad035)
We expect to have 1 stream for each routing key, but
as binding can return several queues for a given key we
let that possibility open in the stream protocol.
Instead of injecting it into varios places inside the code.
When the osiris log is closed it will decrement the global "readers"
counter which is why it is much safer to do this in terminate.
Otherwise metrics will not get cleaned up correctly when processes crash.
It's also tidier to do this in a single place, in terminate/3
Pair: @kjnilsson
Signed-off-by: Gerhard Lazu <gerhard@lazu.co.uk>
since it fails with Bazel.
As discussed with @pjk25, let's set this value via application env,
make it configurable to the test, but not configurable to the user.
Add state timeouts.
If the client takes more than 10s for a single step in the authentication
protocol, make the server close the TCP connection.
Also close the TCP connection if the server times out in state
close_sent. That's the case when the client sends an invalid command
(after successful authentication), the server requests the client to
close the connection, but the client doesn't respond anymore.
Default gen_server timeout is not enough to list busy connections.
Setting it to infinity allows the caller to decide the timeout,
as classic queues do. The `emit_info` function's family sets its
own timeout for all the cli commands.
Calling ensure_stats_timer after init_stats_timer and reset_stats_timer
is enough.
The idea is to call stop_stats_timer before hibernation and
ensure_stats_timer on wakeup. However, since we never call
stop_stats_timer in rabbit_stream_reader, we don't need to call
ensure_stats_timer on every network activity.
Before this commit test AlarmsTest.diskAlarmShouldNotPreventConsumption
of the Java client was failing.
When executing that test, the server failed with:
2021-06-25 16:11:02.886935+02:00 [error] <0.1301.0> exception exit: {unexpected_message,resume}
2021-06-25 16:11:02.886935+02:00 [error] <0.1301.0> in function rabbit_heartbeat:heartbeater/3 (src/rabbit_heartbeat.erl, line 138
because the heartbeater was tried to be resumed without being paused
before.
Above exception exit also happens on master branch when executing this
test. However, the test falsely succeeds on master because the following FIXME was
never implemented:
8e569ad8bf/deps/rabbitmq_stream/src/rabbit_stream_reader.erl (L778)
This is pure refactoring - no functional change.
Benefits:
* code is more maintainable
* smaller methods (instead of previous 350 lines listen_loop_post_auth function)
* well defined state transitions (e.g. useful to enforce authentication protocol)
* we get some gen_statem helper functions for free (e.g. debug utilities)
Useful doc: https://ninenines.eu/docs/en/ranch/2.0/guide/protocols/
This way we can show how many messages were received via a certain
protocol (stream is the second real protocol besides the default amqp091
one), as well as by queue type, which is something that many asked for a
really long time.
The most important aspect is that we can also see them by protocol AND
queue_type, which becomes very important for Streams, which have
different rules from regular queues (e.g. for example, consuming
messages is non-destructive, and deep queue backlogs - think billions of
messages - are normal). Alerting and consumer scaling due to deep
backlogs will now work correctly, as we can distinguish between regular
queues & streams.
This has gone through a few cycles, with @mkuratczyk & @dcorbacho
covering most of the ground. @dcorbacho had most of this in
https://github.com/rabbitmq/rabbitmq-server/pull/3045, but the main
branch went through a few changes in the meantime. Rather than resolving
all the conflicts, and then making the necessary changes, we (@gerhard +
@kjnilsson) took all learnings and started re-applying a lot of the
existing code from #3045. We are confident in this approach and would
like to see it through. We continued working on this with @dumbbell, and
the most important changes are captured in
https://github.com/rabbitmq/seshat/pull/1.
We expose these global counters in rabbitmq_prometheus via a new
collector. We don't want to keep modifying the existing collector, which
grew really complex in parts, especially since we introduced
aggregation, but start with a new namespace, `rabbitmq_global_`, and
continue building on top of it. The idea is to build in parallel, and
slowly transition to the new metrics, because semantically the changes
are too big since streams, and we have been discussing protocol-specific
metrics with @kjnilsson, which makes me think that this approach is
least disruptive and... simple.
While at this, we removed redundant empty return value handling in the
channel. The function called no longer returns this.
Also removed all DONE / TODO & other comments - we'll handle them when
the time comes, no need to leave TODO reminders.
Pairs @kjnilsson @dcorbacho @dumbbell
(this is multiple commits squashed into one)
Signed-off-by: Gerhard Lazu <gerhard@lazu.co.uk>
Most tests that can start rabbitmq nodes have some chance of
flaking. Rather than chase individual flakes for now, this commit
changes the default (though it can still be overriden, as is the case
for config_scheme_SUITE in many places, since I have yet to see that
particular suite flake).
Before this commit sending garbarge data to the server stream port
caused the RabbitMQ node to eat more and more memory.
In this commit, we fix it by expecting the client to go through the
proper authentication sequence. Otherwise, the server closes the socket.
Co-authored-by: Michal Kuratczyk <mkuratczyk@pivotal.io>