Detect when a new stream leader is elected and make stream_queues
re-send any unconfirmed, pending messages to ensure they did not get
lost during the leader change. This is done using the osiris
deduplication feature to ensure the resend does not create duplicates of
messages in the stream.
This restores the behavior prior the commit making `rabbit` closer to a
standard Erlang application.
Plugins are still actually started after rabbit is started (because they
depend on the `rabbit` application). Only the execution of their boot
steps was moved earlier.
With the behavior restored, it also means that a plugin's dependencies
are not started yet when its boot steps are executed.
V2: Move the maintenance mode reset before the plugin boot steps run.
V3: Add a `core_started` boot state. That state is reached at the end of
the `rabbit` app start function. It indicates when the RabbitMQ core
is started but the full service is not yet ready.
We now use this state in direct connection code to determine if
clients can open a direct connection. We have to do that because
some plugins open a direct connection as part of their own startup
(i.e. they can't wait for the `ready` boot state which comes later).
Without this change using anything other than `rabbit` or the `rabbitmq-env-conf.bat` file will result in `erlang_dist_running_with_unexpected_nodename`
Follow-up to #2673
cc @dumbbell @michaelklishin
Currently RABBITMQ_BASE is always dynamically picked up from the
environment. This change would fix it at the time of configuration
of the service allowing multiple RabbitMQ services to be configured.
Connections to the stream plugin does not have a type, so they can
trigger some function_clause errors. This was the case when trying to
close a connection from rabbit_connection_tracking module. The function
now falls back to a simple gen_server call to the connection process for
connections without a type.
As the connection may crash during the previous declaration and a caught
error would be returned in amqp_connection:open_channel/1 that wasn't
handled previously. Exactly how things fail in this test is most likely
very timing dependent and may vary.
Also fixes mqtt test where the process that set up a mock auth ETS table
was transient when an rpc timeout was introduced
Else an application that polled an empty quorum queue frequntly using basic.get
would never result in a snapshot being taken and results in unlimited
log growth.