by changing the lqueue state.
This results in less memory usage and hence less garbage collection
when queue functions are called many thousand times per second.
The subset of functions lqueue supports are copied from
OTP's queue module and extended to update the length.
lqueue's foldr/3 is deleted since there is no usage.
lqueue's foldl/3 is renamed to fold/3 to match OTP's queue naming.
lqueue accepts both old and new state, but returns only new state.
We now know that we will not be expecting transient messages
after clean restarts, and we can preserve the order of
persistent messages that were confirmed.
This gets around race condition related problems caused by
the client sending a basic.cancel while the server sends
a basic.cancel itself. This also solves a similar issue
where the server delivers a message that I have not
investigated thoroughly.
Details about the basic.cancel race condition issue can
be found in https://github.com/rabbitmq/rabbitmq-server/issues/4070
The added command resulted in the following additional changes:
- Allow more queues restart for test purposes
- Fix crashes that may happen following a queue crash+restart
- Fix reading from the index after crash+restart where messages
may be both in q1 and read from the index
- Fix a race condition when stopping a node while a queue
crash+restart: make lookup of message store use pg
This helps us test cases where the queue restarts cleanly.
Because it is not completely deterministic, there is some
clever handling of messages to accept messages that were
acked by the client but the server didn't ack before the
restart, and messages published without confirms that
never made it to the server before it restarted.
There is potential to improve confirms handling for that
scenario but that is left as an exercise for a later time.
On dirty recovery the count in the segment file was already
accurate. It was not accurate otherwise as it assumed that
all messages would be written to the index, which is not
the case in the current implementation.
Removed the now pointless command checking for the process liveness.
Increased the number of checks to 500 (x5 the default).
Added weights for the commands: publish/get at 900 and
set mode/version at 100.
Because queues deliver messages sequentially we do not need to
keep track of delivers per message, we just need to keep track
of the highest message that was delivered, via its seq_id().
This allows us to avoid updating the index and storing data
unnecessarily and can help simplify the code (not seen in this
WIP commit because the code was left there or commented out
for the time being).
Includes a few small bug fixes.
This currently works both with confirms and not.
It currently always writes to the per-queue store,
it would be good to write fan-out messages to the
shared store though.
It would be good to remove the usage of MsgId except
when the shared store is needed.
bazel-erlang has been renamed rules_erlang. v2 is a substantial
refactor that brings Windows support. While this alone isn't enough to
run all rabbitmq-server suites on windows, one can at least now start
the broker (bazel run broker) and run the tests that do not start a
background broker process
In order to retain deterministic results of state machine applications
during upgrades we need to make the stream coordinator versioned such
that we only use the new logic once the stream coordinator switches to
machine version 1.
For booleans, we can prefer the operator policy value
unconditionally, without any safety implications.
Per discussion with @binarin @pjk25
(cherry picked from commit 6edb7396fd)
A channel that first sends a mandatory publish before enabling
confirms mode may not receive confirms for messages published
after that. This is because the publish_seqno was increased
also for mandatory publishes even if confirms were disabled.
But the mandatory feature has nothing to do with publish_seqno.
The issue exists since at least
38e5b687de
The test case introduced focuses for multiple=false. The issue
also exists for multiple=true but it has a different impact:
sending multiple=true,delivery_tag=2 results in both messages
1 and 2 being acked, even if message 2 doesn't exist as far
as the client is concerned. If the message does exist
it might get confirmed earlier than it should have been. The
issue is a bigger problem the more mandatory messages were
sent before enabling confirms mode.
Before this commit, the tests were not including any settle, return, or
discard Ra commands.
Do not pattern match against 'ra_event' because nowadays:
_Opts = [local, ra_event]
From the coordinator's POV each stream has a unique id consisting of the
vhost, queuename and a high resolution timestamp even if several stream ids
relate to the same queue record.
When performing the mnesia update the coordinator now checks that the current stream id
matches that of the update_mnesia action and does not change the queue record if
the stream id is not the same.
This should avoid "old" incarnations of a stream queue updating newer ones
with incorrect information.
This is meant to be used by deployment tools,
core features and plugins
that expect a certain minimum
number of cluster nodes
to be present.
For example, certain setup steps
in distributed plugins might require
at least three nodes to be available.
This is just a hint, not an enforced
requirement. The default value is 1
so that for single node clusters,
there would be no behavior changes.
Deriving a max-cluster-size only from running nodes would create situations where
in a three-node with only two nodes running cluster it would select an non-running
node as follower.
As AMQP 0.9.1 headers are translated into AMQP 1.0 application properties
they are not able to contain complex values such as arrays or tables.
RabbitMQ federation does use array and table values so to avoid crashing when
delivering a federated message to a stream queue we drop them. These header values
should be considered internal however so dropping them before a final queue deliver should not be a huge problem.
The suite level timeout the .erl I've learned is actually per
case. By sharding bu testcase, we can better match the common test
level and bazel level timeouts, such that we can get logs from remote
test run failures.
Previously the bazel timeout and common test timeout were equal, which
meant that in practice the bazel timeout was often reached first, in
which case we don't receive the test logs
If this is not done apps that consume/cancel from empty queues in a loop
will grow the raft log in an unbounded manner. This could also be the
case for the garbage_collect command.
If the queue is empty when a consumer is cancelled it would leave the
consumer id inside the service queue. If an application subscribes/unsubscibes
in a loop from an empty queue this would cause the service queue to never be
cleared up.
NB: whenever we make a change to how the quorum queue state machien is
calculated we need to consider how this effects determinism as during an
upgrade different members may calculate a different service queue state.
In this case it should be ok as they will eventually converge on the same
state once all "dead" consumer ids have been removed from the queue.
In any case it should not affect how messages are assigned to consumers.
Two testcases in the original suite fail if the test is run as the
root user. Currently under remote execution with bazel this is the
only working option. There is a workaround in place, but the entire
suite when run that way takes around 12 minutes. This splits the suite
so that the minimal set of cases is executed using the slower workaround.
Rather than sleeping for 6 seconds, we want to check that replica
recovered multiple times within 30 seconds, and either eventually
succeed, or fail if this does not recover within 30 seconds, the default
await_condition time interval.
Pair: @kjnilsson
Signed-off-by: Gerhard Lazu <gerhard@lazu.co.uk>
Prior to this change, exclusive queues have been subject to the queue
location process, just like other queues. Therefore, if
queue_master_locator was not client-local and x-queue-master-locator was
not set to client-local, an exclusive queue was likely to be located on
a different node than the connection it is exclusive to. This is
suboptimal and may lead to inconsistencies when the queue's node goes
down while the connection's node is still up.
The classic local filesystem source is still supported
using the same traditional configuration key, load_definitions.
Configuration schema follows peer discovery in spirit:
* definitions.import_backend configures the mechanism to use,
which can be a module provided by a plugin
* definitions.* keys can be defined by plugins and contain any
keys a specific mechanism needs
For example, the classic local filesystem source can now be
configured like this:
``` ini
definitions.import_backend = local_filesystem
definitions.local.path = /path/to/definitions.d/definition.json
```
``` ini
definitions.import_backend = https
definitions.https.url = https://hostname/path/to/definitions.json
```
HTTPS may require additional configuration keys related to TLS/x.509
peer verification. Such extra keys will be added as the need for them
becomes evident.
References #3249
The code was passing a number (the timestamp) to
unicode:characters_to_binary/1 which expects an iolist to convert to
UTF-8.
We now verify if we have a number before calling that function. If this
is a number (integer or float), we keep it as is because JSON supports
that type.
Unlike with gnu make, mixed version testing with bazel uses a package-generic-unix for the secondary umbrella rather than the source. This brings the benefit of being able to mixed version test releases built with older erlang versions (even though all nodes will run under the single version given to bazel)
This introduces new test labels, adding a `-mixed` suffix for every existing test. They can be skipped if necessary with `--test_tag_filters` (see the github actions workflow for an example)
As part of the change, it is now possible to run an old release of rabbit with rabbitmq_run rule, such as:
`bazel run @rabbitmq-server-generic-unix-3.8.17//:rabbitmq-run run-broker`
Recovering from an existing queue is fine but if a node is restarted when
there are no longer stream queues on the system, the recovery process won't
restart the pre-existing coordinator as that's only performed on queue recovery.
The first attempt to declare a new stream queue on this cluster will crash with
`coordinator unavailable` error, as it only restarts the local coordinator
and not the whole ra cluster, thus lacking quorum.
Recovering the coordinator during the boot process ensures that a pre-existing
coordinator cluster is restarted in any case, and does nothing if there was
never a coordinator on the node.
Other tests (that produce flakes) arguably test classic mirrored
queues, a deprecated feature reasonably well
covered in other suites.
Per discussion with @gerhard.
This way we can show how many messages were received via a certain
protocol (stream is the second real protocol besides the default amqp091
one), as well as by queue type, which is something that many asked for a
really long time.
The most important aspect is that we can also see them by protocol AND
queue_type, which becomes very important for Streams, which have
different rules from regular queues (e.g. for example, consuming
messages is non-destructive, and deep queue backlogs - think billions of
messages - are normal). Alerting and consumer scaling due to deep
backlogs will now work correctly, as we can distinguish between regular
queues & streams.
This has gone through a few cycles, with @mkuratczyk & @dcorbacho
covering most of the ground. @dcorbacho had most of this in
https://github.com/rabbitmq/rabbitmq-server/pull/3045, but the main
branch went through a few changes in the meantime. Rather than resolving
all the conflicts, and then making the necessary changes, we (@gerhard +
@kjnilsson) took all learnings and started re-applying a lot of the
existing code from #3045. We are confident in this approach and would
like to see it through. We continued working on this with @dumbbell, and
the most important changes are captured in
https://github.com/rabbitmq/seshat/pull/1.
We expose these global counters in rabbitmq_prometheus via a new
collector. We don't want to keep modifying the existing collector, which
grew really complex in parts, especially since we introduced
aggregation, but start with a new namespace, `rabbitmq_global_`, and
continue building on top of it. The idea is to build in parallel, and
slowly transition to the new metrics, because semantically the changes
are too big since streams, and we have been discussing protocol-specific
metrics with @kjnilsson, which makes me think that this approach is
least disruptive and... simple.
While at this, we removed redundant empty return value handling in the
channel. The function called no longer returns this.
Also removed all DONE / TODO & other comments - we'll handle them when
the time comes, no need to leave TODO reminders.
Pairs @kjnilsson @dcorbacho @dumbbell
(this is multiple commits squashed into one)
Signed-off-by: Gerhard Lazu <gerhard@lazu.co.uk>
In case removed node hosts a leader, it takes a moment for
the QQ to elect a new one and begin accepting cluster
membership change operations again.
(cherry picked from commit a9d8816c6a)
Mark per_user_connection_channel_tracking_SUITE:cluster_size_2_network
as not mixed version compatible.
In a mixed 3.8/3.9 cluster, changes to rabbit_core_ff.erl imply that
some feature flag related migrations cannot occur, and therefore
user_limits cannot be enabled as required by the test
quorum_unaffected_after_vhost_failure isn't mixed versions compatible as
it tries to declare a queue in a mixed cluster from a node running RA 1.x where all other
nodes are running Ra 2.0.
simple_confirm_availability_on_leader_change can't be made forwards compatible
as when running in mixed mode the queue declaration happens on an old node in
a cluster of mostly new nodes. As new nodes run Ra 2.0 and Ra 1.x does not know
how to create members on Ra 2.0 nodes this test fails. This is an acceptable limitation
for a transient mixed versions cluster.
Now that a policy overwrites queue arguments, running policy tests in
parallel with other tests leads to non-deterministic test results with
some tests randomly failing.
On initial cluster formation, only one node in a multi node cluster
should initialize the Mnesia database schema (i.e. form the cluster).
To ensure that for nodes starting up in parallel,
RabbitMQ peer discovery backends have used
either locks or randomized startup delays.
Locks work great: When a node holds the lock, it either starts a new
blank node (if there is no other node in the cluster), or it joins
an existing node. This makes it impossible to have two nodes forming
the cluster at the same time.
Consul and etcd peer discovery backends use locks. The lock is acquired
in the consul and etcd infrastructure, respectively.
For other peer discovery backends (classic, DNS, AWS), randomized
startup delays were used. They work good enough in most cases.
However, in https://github.com/rabbitmq/cluster-operator/issues/662 we
observed that in 1% - 10% of the cases (the more nodes or the
smaller the randomized startup delay range, the higher the chances), two
nodes decide to form the cluster. That's bad since it will end up in a
single Erlang cluster, but in two RabbitMQ clusters. Even worse, no
obvious alert got triggered or error message logged.
To solve this issue, one could increase the randomized startup delay
range from e.g. 0m - 1m to 0m - 3m. However, this makes initial cluster
formation very slow since it will take up to 3 minutes until
every node is ready. In rare cases, we still end up with two nodes
forming the cluster.
Another way to solve the problem is to name a dedicated node to be the
seed node (forming the cluster). This was explored in
https://github.com/rabbitmq/cluster-operator/pull/689 and works well.
Two minor downsides to this approach are: 1. If the seed node never
becomes available, the whole cluster won't be formed (which is okay),
and 2. it doesn't integrate with existing dynamic peer discovery backends
(e.g. K8s, AWS) since nodes are not yet known at deploy time.
In this commit, we take a better approach: We remove randomized startup
delays altogether. We replace them with locks. However, instead of
implementing our own lock implementation in an external system (e.g. in K8s),
we re-use Erlang's locking mechanism global:set_lock/3.
global:set_lock/3 has some convenient properties:
1. It accepts a list of nodes to set the lock on.
2. The nodes in that list connect to each other (i.e. create an Erlang
cluster).
3. The method is synchronous with a timeout (number of retries). It
blocks until the lock becomes available.
4. If a process that holds a lock dies, or the node goes down, the lock
held by the process is deleted.
The list of nodes passed to global:set_lock/3 corresponds to the nodes
the peer discovery backend discovers (lists).
Two special cases worth mentioning:
1. That list can be all desired nodes in the cluster
(e.g. in classic peer discovery where nodes are known at
deploy time) while only a subset of nodes is available.
In that case, global:set_lock/3 still sets the lock not
blocking until all nodes can be connected to. This is good since
nodes might start sequentially (non-parallel).
2. In dynamic peer discovery backends (e.g. K8s, AWS), this
list can be just a subset of desired nodes since nodes might not startup
in parallel. That's also not a problem as long as the following
requirement is met: "The peer disovery backend does not list two disjoint
sets of nodes (on different nodes) at the same time."
For example, in a 2-node cluster, the peer discovery backend must not
list only node 1 on node 1 and only node 2 on node 2.
Existing peer discovery backends fullfil that requirement because the
resource the nodes are discovered from is global.
For example, in K8s, once node 1 is part of the Endpoints object, it
will be returned on both node 1 and node 2.
Likewise, in AWS, once node 1 started, the described list of instances
with a specific tag will include node 1 when the AWS peer discovery backend
runs on node 1 or node 2.
Removing randomized startup delays also makes cluster formation
considerably faster (up to 1 minute faster if that was the
upper bound in the range).
x-stream-offset supports "friendly" relative timebase specifications
such as 100s. A recent change introduced a validation of the x-stream-offset
that disallowed such specs.
This introduces a backup mechanism that can be controlled
by plugins via policies.
Benchmarks suggest the cost of this change on
Erlang 24 is well under 1%. With a stream target, it is less
than routing to one extra queue of the same type (e.g. a quorum queue).
The new default of 2048 was chosen based on various scenarios.
It provides much better memory usage when many queues are used
(allowing one host to go from 500 queues to 800+ queues) and
there seems to be none or negligible performance cost (< 1%)
for single queues.
In the case where there are some messages kept in memory mixed with
some that are not it is possible that a messages are delivered to the
consuming channel with gaps/out of order which would in some cases cause
the channel to treat them as re-sends it has already seen and just
discard them. When this happens the messages get stuck in the consumer
state inside the queue and are never seen by the client consumer and
thus never acked. When this happen the release cursors can't be emitted
as the smallest raft index will be one of the stuck messages.
for usability. It is not any different from when a float value
is used and only exists as a counterpart to '{absolute, N}'.
Also nothing changes for rabbitmq.conf users as that format performs
validation and correct value translation.
See #2694, #2965 for background.
Now that the Cuttlefish schema sets default values for the application
environment in `{rabbit, [{log, ...}]}`, the values set in the testsuite
using application:setenv() are overwritten.
By using the $RABBITMQ_LOGS environment variable, we can override those
default values.
This suite contains only one group, but is long enough to warrant
sharding. This is probably a bit of a time penalty in absolute terms
because init_per_suite and init_per_group re-run in each shard.
This is not exposed to the end user (yet) through the Cuttlefish
configuration. But this is required to make logging_SUITE timezone
agnostic (i.e. the timezone of the host running the testsuite should not
affect the formatted times).
The design of the rabbit_ct_config_schema helper makes it impossible to
do pattern matching and thus handle default values in the schema. As a
consequence, the helper explicitly removes the `{rabbit, {log, _}}`
configuration key to work around this limitation until a proper solution
is implemented and all testsuites rewritten. See
rabbitmq/rabbitmq-ct-helpers@b1f1f1ce68.
Therefore, we can't test log configuration variables anymore using this
helper. Thatt's ok because logging_SUITE already tests many things.
In addition to the existing configuration variables to configure
logging, the following variables were added to extend the settings.
log.*.formatter = plaintext | json
Selects between the plain text (default) and JSON formatters.
log.*.formatter.time_format = rfc3339_space | rfc3339_T | epoch_usecs | epoch_secs | lager_default
Configures how the timestamp should be formatted. It has several
values to get RFC3339 date & time, Epoch-based integers and Lager
default format.
log.*.formatter.level_format = lc | uc | lc3 | uc3 | lc4 | uc4
Configures how to format the level. Things like uppercase vs.
lowercase, full vs. truncated.
Examples:
lc: debug
uc: DEBUG
lc3: dbg
uc3: DBG
lw4: dbug
uc4: DBUG
log.*.formatter.single_line = on | off
Indicates if multi-line messages should be reformatted as a
single-line message. A multi-line message is converted to a
single-line message by joining all lines and separating them
with ", ".
log.*.formatter.plaintext.format
Set to a pattern to indicate the format of the entire message. The
format pattern is a string with $-based variables. Each variable
corresponds to a field in the log event. Here is a non-exhaustive list
of common fields:
time
level
msg
pid
file
line
Example:
$time [$level] $pid $msg
log.*.formatter.json.field_map
Indicates if fields should be renamed or removed, and the ordering
which they should appear in the final JSON object. The order is set by
the order of fields in that coniguration variable.
Example:
time:ts level msg *:-
In this example, `time` is renamed to `ts`. `*:-` tells to remove all
fields not mentionned in the list. In the end the JSON object will
contain the fields in the following order: ts, level, msg.
log.*.formatter.json.verbosity_map
Indicates if a verbosity field should be added and how it should be
derived from the level. If the verbosity map is not set, no verbosity
field is added to the JSON object.
Example:
debug:2 info:1 notice:1 *:0
In this example, debug verbosity is 2, info and notice verbosity is 1,
other levels have a verbosity of 0.
All of them work with the console, exchange, file and syslog outputs.
The console output has specific variables too:
log.console.stdio = stdout | stderr
Indicates if stdout or stderr should be used. The default is stdout.
log.console.use_colors = on | off
Indicates if colors should be used in log messages. The default
depends on the environment.
log.console.color_esc_seqs.*
Indicates how each level is mapped to a color. The value can be any
string but the idea is to use an ANSI escape sequence.
Example:
log.console.color_esc_seqs.error = \033[1;31m
V2: A custom time format pattern was introduced, first using variables,
then a reference date & time (e.g. "Mon 2 Jan 2006"), thanks to
@ansd. However, we decided to remove it for now until we have a
better implementation of the reference date & time parser.
V3: The testsuite was extended to cover new settings as well as the
syslog output. To test it, a fake syslogd server was added (Erlang
process, part of the testsuite).
V4: The dependency to cuttlefish is moved to rabbitmq_prelaunch which
actually uses the library. The version is updated to 3.0.1 because
we need Kyorai/cuttlefish#25.
Adds WORKSPACE.bazel, BUILD.bazel & *.bzl files for partial build & test with Bazel. Introduces a build-time dependency on https://github.com/rabbitmq/bazel-erlang
... if it is set in the configuration file.
Here is an example of that use case:
* The official Docker image sets RABBITMQ_LOGS=- in the environment
* A user of that image adds a configuration file with:
log.console.level = debug
The initial implementation, introduced in rabbitmq/rabbitmq-server#2861,
considered that if the output is overriden in the environment (through
$RABBITMQ_LOGS), any output configuration in the configuration file is
ignored.
The problem is that the output-specific configuration could also set the
log level which is not changed by $RABBITMQ_LOGS. This patch fixes that
by keeping the log level from the configuration (if it is set obviously)
even if the output is overridden in the environment.
Sleep for 5s after a failure due to a node being down before reporting
back to stream coordinator (which will immediately retry).
stream coordinator: correct command type spec
tidy up
fix rabbit_fifo_prop tests
stream coord: add function for member state query
The configuration remains the same for the end-user. The only exception
is the log root directory: it is now set through the `log_root`
application env. variable in `rabbit`. People using the Cuttlefish-based
configuration file are not affected by this exception.
The main change is how the logging facility is configured. It now
happens in `rabbit_prelaunch_logging`. The `rabbit_lager` module is
removed.
The supported outputs remain the same: the console, text files, the
`amq.rabbitmq.log` exchange and syslog.
The message text format slightly changed: the timestamp is more precise
(now to the microsecond) and the level can be abbreviated to always be
4-character long to align all messages and improve readability. Here is
an example:
2021-03-03 10:22:30.377392+01:00 [dbug] <0.229.0> == Prelaunch DONE ==
2021-03-03 10:22:30.377860+01:00 [info] <0.229.0>
2021-03-03 10:22:30.377860+01:00 [info] <0.229.0> Starting RabbitMQ 3.8.10+115.g071f3fb on Erlang 23.2.5
2021-03-03 10:22:30.377860+01:00 [info] <0.229.0> Licensed under the MPL 2.0. Website: https://rabbitmq.com
The example above also shows that multiline messages are supported and
each line is prepended with the same prefix (the timestamp, the level
and the Erlang process PID).
JSON is also supported as a message format and now for any outputs.
Indeed, it is possible to use it with e.g. syslog or the exchange. Here
is an example of a JSON-formatted message sent to syslog:
Mar 3 11:23:06 localhost rabbitmq-server[27908] <0.229.0> - {"time":"2021-03-03T11:23:06.998466+01:00","level":"notice","msg":"Logging: configured log handlers are now ACTIVE","meta":{"domain":"rabbitmq.prelaunch","file":"src/rabbit_prelaunch_logging.erl","gl":"<0.228.0>","line":311,"mfa":["rabbit_prelaunch_logging","configure_logger",1],"pid":"<0.229.0>"}}
For quick testing, the values accepted by the `$RABBITMQ_LOGS`
environment variables were extended:
* `-` still means stdout
* `-stderr` means stderr
* `syslog:` means syslog on localhost
* `exchange:` means logging to `amq.rabbitmq.log`
`$RABBITMQ_LOG` was also extended. It now accepts a `+json` modifier (in
addition to the existing `+color` one). With that modifier, messages are
formatted as JSON intead of plain text.
The `rabbitmqctl rotate_logs` command is deprecated. The reason is
Logger does not expose a function to force log rotation. However, it
will detect when a file was rotated by an external tool.
From a developer point of view, the old `rabbit_log*` API remains
supported, though it is now deprecated. It is implemented as regular
modules: there is no `parse_transform` involved anymore.
In the code, it is recommended to use the new Logger macros. For
instance, `?LOG_INFO(Format, Args)`. If possible, messages should be
augmented with some metadata. For instance (note the map after the
message):
?LOG_NOTICE("Logging: switching to configured handler(s); following "
"messages may not be visible in this log output",
#{domain => ?RMQLOG_DOMAIN_PRELAUNCH}),
Domains in Erlang Logger parlance are the way to categorize messages.
Some predefined domains, matching previous categories, are currently
defined in `rabbit_common/include/logging.hrl` or headers in the
relevant plugins for plugin-specific categories.
At this point, very few messages have been converted from the old
`rabbit_log*` API to the new macros. It can be done gradually when
working on a particular module or logging.
The Erlang builtin console/file handler, `logger_std_h`, has been forked
because it lacks date-based file rotation. The configuration of
date-based rotation is identical to Lager. Once the dust has settled for
this feature, the goal is to submit it upstream for inclusion in Erlang.
The forked module is calld `rabbit_logger_std_h` and is based
`logger_std_h` in Erlang 23.0.
The time this operation can take in clusters with a lot of classic
mirrored queue (say, 10s or 100s of thousands) be prohibitive for
upgrades.
Upgrades that use a health check to ensure that there are in-sync
replicas before entering maintenance mode, in which case
the transfer is not really necessary.
All of the above is more obvious with the recent changes in #2749.
Detect when a new stream leader is elected and make stream_queues
re-send any unconfirmed, pending messages to ensure they did not get
lost during the leader change. This is done using the osiris
deduplication feature to ensure the resend does not create duplicates of
messages in the stream.
As the connection may crash during the previous declaration and a caught
error would be returned in amqp_connection:open_channel/1 that wasn't
handled previously. Exactly how things fail in this test is most likely
very timing dependent and may vary.
Also fixes mqtt test where the process that set up a mock auth ETS table
was transient when an rpc timeout was introduced
Else an application that polled an empty quorum queue frequntly using basic.get
would never result in a snapshot being taken and results in unlimited
log growth.