Single active consumer must have a name, which is used as the reference
for storing offsets and as the name of the group the consumer belongs
to in case the stream is a partition of a super stream.
References #3753
A formerly active consumer can have in-flight credit
requests when it becomes inactive. This commit checks
the state of consumer on credit requests and make sure
not to dispatch messages if it's inactive.
Stream consumers can be active or not with SAC, so these 2 fields
are added to the stream metrics. This is the same as with
regular consumers.
References #3753
Separate logic between single SAC and SAC in a partition of a super
stream (makes code clearer), wait for former active consumer
notification to select the new active consumer (avoids a lock
of some sort on the group, so consumers can come and go).
References #3753
In SAC coordinator. The messages to connections cannot be sent
from the state machine or they will be sent also during the
replication. Only the leader enforces side effects.
References #3753
The SAC group coordination should happen as part of raft state
machine, to make it more reliable and robust.
It would also benefit from different "services" the coordinator
and RA provide.
References #3753
Deprecate queue-leader-locator values 'random' and 'least-leaders'.
Both become value 'balanced'.
From now on only queue-leader-locator values 'client-local', and
'balanced' should be set.
'balanced' will place the leader on the node with the least leaders if
there are few queues and will select a random leader if there are many
queues.
This avoid expensive least leaders calculation if there are many queues.
This change also allows us to change the implementation of 'balanced' in
the future. For example 'balanced' could place a leader on a node
depending on resource usage or available node resources.
There is no need to expose implementation details like 'random' or
'least-leaders' as configuration to users.
rabbitmq_cli uses some private rules_erlang apis that have changed in
the upcoming release
Additionally:
- Avoid including both standard and test versions of amqp_client in
integration test suites
- Eliminate most of the compilation order hints (explicit first_srcs)
in the bazel build
- Fix an include statement - in bazel, an app is not available to
itself as a library at compilation time
Several listeners for the same offset can be registered,
which causes problems for low rate, where consumers hit
the end of the stream often. Then Osiris messages accumulate
in the process mailbox, which slows it down considerably after
some time.
This commit makes the listener registration idempotent for a given
offset.
A stream may not be available or in an inconsistent state and the
stream coordinator reports this with an error that the stream
manager handles like an appropriate response. The stream protocol
adapter then tries to extract the topology from the error.
This commit makes the stream manager handles the error correctly
so that the stream protocol adapter reports the unavailability
of the stream to the client.
This is useful in case new fields are needed in further
versions. A new version node can ask for new fields to an
old version node in a mixed-version cluster.
If the old version node returns a known
value for unknown fields instead of failing, the new
node can set up appropriate default value for these
fields in the result of the CLI commands.
Only a couple of fields of the stream consumer record change
very frequently (credits and Osiris log reference), so this commit
introduces a nested record in the main consumer record that
contains the immutable fields. This potentially avoids producing
a lot of garbage, especially when the consumer state contains
several properties (consumer name, or single active consumer information
in the future).
Fixes#3841
Use case: Allow plain connections over one (internal IP), and TLS
connections over another IP (eg. internet routable IP). Without this
patch a cluster can only support access over one or the other IP, not
both.
(cherry picked from commit b9e6aad035)
We expect to have 1 stream for each routing key, but
as binding can return several queues for a given key we
let that possibility open in the stream protocol.
Instead of injecting it into varios places inside the code.
When the osiris log is closed it will decrement the global "readers"
counter which is why it is much safer to do this in terminate.
Otherwise metrics will not get cleaned up correctly when processes crash.
It's also tidier to do this in a single place, in terminate/3
Pair: @kjnilsson
Signed-off-by: Gerhard Lazu <gerhard@lazu.co.uk>
since it fails with Bazel.
As discussed with @pjk25, let's set this value via application env,
make it configurable to the test, but not configurable to the user.
Add state timeouts.
If the client takes more than 10s for a single step in the authentication
protocol, make the server close the TCP connection.
Also close the TCP connection if the server times out in state
close_sent. That's the case when the client sends an invalid command
(after successful authentication), the server requests the client to
close the connection, but the client doesn't respond anymore.
Default gen_server timeout is not enough to list busy connections.
Setting it to infinity allows the caller to decide the timeout,
as classic queues do. The `emit_info` function's family sets its
own timeout for all the cli commands.
Calling ensure_stats_timer after init_stats_timer and reset_stats_timer
is enough.
The idea is to call stop_stats_timer before hibernation and
ensure_stats_timer on wakeup. However, since we never call
stop_stats_timer in rabbit_stream_reader, we don't need to call
ensure_stats_timer on every network activity.
Before this commit test AlarmsTest.diskAlarmShouldNotPreventConsumption
of the Java client was failing.
When executing that test, the server failed with:
2021-06-25 16:11:02.886935+02:00 [error] <0.1301.0> exception exit: {unexpected_message,resume}
2021-06-25 16:11:02.886935+02:00 [error] <0.1301.0> in function rabbit_heartbeat:heartbeater/3 (src/rabbit_heartbeat.erl, line 138
because the heartbeater was tried to be resumed without being paused
before.
Above exception exit also happens on master branch when executing this
test. However, the test falsely succeeds on master because the following FIXME was
never implemented:
8e569ad8bf/deps/rabbitmq_stream/src/rabbit_stream_reader.erl (L778)
This is pure refactoring - no functional change.
Benefits:
* code is more maintainable
* smaller methods (instead of previous 350 lines listen_loop_post_auth function)
* well defined state transitions (e.g. useful to enforce authentication protocol)
* we get some gen_statem helper functions for free (e.g. debug utilities)
Useful doc: https://ninenines.eu/docs/en/ranch/2.0/guide/protocols/
This way we can show how many messages were received via a certain
protocol (stream is the second real protocol besides the default amqp091
one), as well as by queue type, which is something that many asked for a
really long time.
The most important aspect is that we can also see them by protocol AND
queue_type, which becomes very important for Streams, which have
different rules from regular queues (e.g. for example, consuming
messages is non-destructive, and deep queue backlogs - think billions of
messages - are normal). Alerting and consumer scaling due to deep
backlogs will now work correctly, as we can distinguish between regular
queues & streams.
This has gone through a few cycles, with @mkuratczyk & @dcorbacho
covering most of the ground. @dcorbacho had most of this in
https://github.com/rabbitmq/rabbitmq-server/pull/3045, but the main
branch went through a few changes in the meantime. Rather than resolving
all the conflicts, and then making the necessary changes, we (@gerhard +
@kjnilsson) took all learnings and started re-applying a lot of the
existing code from #3045. We are confident in this approach and would
like to see it through. We continued working on this with @dumbbell, and
the most important changes are captured in
https://github.com/rabbitmq/seshat/pull/1.
We expose these global counters in rabbitmq_prometheus via a new
collector. We don't want to keep modifying the existing collector, which
grew really complex in parts, especially since we introduced
aggregation, but start with a new namespace, `rabbitmq_global_`, and
continue building on top of it. The idea is to build in parallel, and
slowly transition to the new metrics, because semantically the changes
are too big since streams, and we have been discussing protocol-specific
metrics with @kjnilsson, which makes me think that this approach is
least disruptive and... simple.
While at this, we removed redundant empty return value handling in the
channel. The function called no longer returns this.
Also removed all DONE / TODO & other comments - we'll handle them when
the time comes, no need to leave TODO reminders.
Pairs @kjnilsson @dcorbacho @dumbbell
(this is multiple commits squashed into one)
Signed-off-by: Gerhard Lazu <gerhard@lazu.co.uk>
Before this commit sending garbarge data to the server stream port
caused the RabbitMQ node to eat more and more memory.
In this commit, we fix it by expecting the client to go through the
proper authentication sequence. Otherwise, the server closes the socket.
Co-authored-by: Michal Kuratczyk <mkuratczyk@pivotal.io>
This is an important metric to keep track of and be aware (maybe even
alert on) when consumers fall behind consuming stream messages. While
they should be able to catch up, if they fall behind too much and the
stream gets truncated, they may miss on messages.
This is something that we want to expose via Prometheus metrics as well,
but we've started closer to the core, CLI & Management.
This should be merged as soon as it passes CI, we shouldn't wait on the
Prometheus changes - they can come later.
Pair: @kjnilsson
Signed-off-by: Gerhard Lazu <gerhard@lazu.co.uk>
To make sure the PID is alive, as the mnesia record can stale after a
failure.
Make also the local PID lookup in the stream coordinator do a consistent
query over the cluster if the PID is not alive.
Co-authored-by: Karl Nilsson <kjnilsson@users.noreply.github.com>
Lager strips trailing newline characters but OTP logger with the default
formatter adds a newline at the end. To avoid unintentional multi-line log
messages we have to revisit most messages logged.
Some log entries are intentionally multiline, others
are printed to stdout directly: newlines are required there
for sensible formatting.
Rabbit channels are responsible of this check before calling declare,
skipping it on the manager meant that the queue was partly redeclared
and a new data directory created. The old one was still on disk with
a different timestamp, but from the user point of view the queue data
has been erased.
They are a bit more defensive. The subscription is also now more
reliable by returning a stream-not-available code if necessary.
Using also Aten poll interval to 1 second (bumped to 5 seconds in master
now).
This is done after checking rabbit_queue and if it returns that the
queue does not exist. The coordinator may be recovering the queue, so
thanks to this double check we know the queue exists but is not
available, instead of thinking it does not exist at all.