This is mostly the same as the `messages_total` property test but checks
that the Raft indexes in `ra_indexes` are the set of the indexes checked
out by all consumers union any indexes in the `returns` queue. This is
the intended state of `ra_indexes` and failing this condition could
cause bugs that would prevent snapshotting.
The AMQP spec defines:
```
<field name="durable" type="boolean" default="false"/>
```
RabbitMQ 4.0 and 4.1 interpret the durable field as true if not set.
The idea was to favour safety over performance.
This complies with the AMQP spec because the spec allows other target or
node specific defaults for the durable field:
> If the header section is omitted the receiver MUST assume the appropriate
> default values (or the meaning implied by no value being set) for the fields
> within the header unless other target or node specific defaults have otherwise
> been set.
However, some client libraries completely omit the header section if the
app expliclity sets durable=false. This complies with the spec, but it
means that RabbitMQ cannot diffentiate between "client app forgot to set
the durable field" vs "client lib opted in for an optimisation omitting
the header section".
This is problematic with JMS message selectors where JMS apps can filter
on JMSDeliveryMode. To be able to correctly filter on JMSDeliveryMode,
RabbitMQ needs to know whether the JMS app sent the message as
PERSISTENT or NON_PERSISTENT.
Rather than relying on client libs to always send the header section
including the durable field, this commit makes RabbitMQ comply with the
default value for durable in the AMQP spec.
Some client lib maintainers accepted to send the header section, while
other maintainers refused to do so:
https://github.com/Azure/go-amqp/issues/330https://issues.apache.org/jira/browse/QPIDJMS-608
Likely the AMQP spec was designed to omit the header section when
performance is important, as is the case with durable=false. Omitting
the header section means saving a few bytes per message on the wire and
some marshalling and unmarshalling overhead on both client and server.
Therefore, it's better to push the "safe by default" behaviour from the broker
back to the client libs. Client libs should send messages as durable by
default unless the client app expliclity opts in to send messages as
non-durable. This is also what JMS does: By default JMS apps send
messages as PERSISTENT:
> The message producer's default delivery mode is PERSISTENT.
Therefore, this commit also makes the AMQP Erlang client send messages as
durable, by default.
This commit will apply to RabbitMQ 4.2.
It's arguably not a breaking change because in RabbitMQ, message durability
is actually more determined by the queue type the message is sent to rather than the
durable field of the message:
* Quroum queues and streams store messages durably (fsync or replicate)
no matter what the durable field is
* MQTT QoS 0 queues hold messages in memory no matter what the
durable field is
* Classic queues do not fsync even if the durable field is set to true
In addition, the RabbitMQ AMQP Java library introduced in RabbitMQ 4.0 sends messages with
durable=true:
53e3dd6abb/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java (L91)
The tests for selecting messages by JMSDeliveryMode relying on the
behaviour in this commit can be found on the `jms` branch.
revive_local_queue_replicas -> revive_local_queue_members
can_redeliver converted from callback to capabilities key
rebalance_moduled converted from callback to capabilities key
At CQ startup variable_queue went through each seqid from 0 to
next_seq_id looking for the first message even if there were no
messages in the queue (no segment files).
In case of a clean shutdown the value next_seq_id is stored in
recovery terms. This value can be utilized by the queue index to
provide better seqid bounds in absence of segment files.
Before this patch starting an empty classic queue with next_seq_id =
100_000_000 used to take about 26 seconds. With this patch it takes
less than 1ms.
[Why]
They were moved from `rabbit` to `rabbit_common` several years ago to
solve an dependency issue because `amqp_client` depended on the file
handle cache. This is not the case anymore.
[How]
The modules are moved back to `rabbit`.
`rabbit_common` doesn't need to depend on `os_mon` anymore. `rabbit`
already depends on it, so no changes needed here.
`include/rabbit_memory.hrl` and some test cases are moved as well to
follow the `vm_memory_monitor` module.
Consumers with a same name, consuming from the same stream should have
the same partition index. This commit adds a check to enforce this rule
and make the subscription fail if it does not comply.
Fixes#13835
Vhosts that currently don't have their own default queue type, now
inherit it from the node configuration and store it in their metadata
going forward.
The correct place for the `default_queue_type` property
is inside the `metadata` block. However, right now we'd
always export the value outside of `metadata` AND only
export it inside `metadata`, if it was not `undefined`.
This value outside of `metadata` was just misleading:
if a user exported the definitins from a fresh node,
changed `classic` to `quorum` and imported such modified
values, the DQT would still be `classic`, because RMQ looks
for the value inside `metadata`. Just to make it more confusing,
if the DQT was changed successfully one way or another, the
value outside of `metadata` would reflect that
(it always shows the correct value, but is ignored on import).
[Why]
The `rabbit_khepri` module grew during the work to add Khepri support to
RabbitMQ and while Khepri was itself written. The current code is
therefore unorganized.
[How]
This commit tries to change proxy functions to be close to their Khepri
equivalent.
The module continues to set non-default options for write functions. We
also add the variants that take an option map to be consistent and not
have to deal with that in the future.
Several legacy functions were removed, either because they were no
longer called or because they were replace by a regular Khepri call.
[Why]
The `rabbit_khepri` module grew during the work to add Khepri support to
RabbitMQ and while Khepri was itself written. The current code is
therefore unorganized.
[How]
This commit tries to sort the code that manages the setup of Khepri and
the functions tha deal with the Khepri cluster. It also groups functions
which provide support for CLI commands.
It also adds documentation to several functions.
Finally, when a node joins a cluster, we stop displaying the content of
the Khepri tree.
It's a tradeoff between building the map for each incoming and outgoing
message (now that there are also outgoing interceptors) vs increased
memory usage for the MQTT proc state.
Connecting with MQTT 5.0 and client ID "xxxxxxxx", the number of words
are 201 before this commit vs 235 after this commit as determined by:
```
S = sys:get_state(MQTTConnectionPid),
erts_debug:size(S).
```
Therefore, this commit requires 34 word * 8 bytes = 272 bytes more per MQTT
connection, that is 272 MB more for 1,000,000 MQTT connections.
1. Force the config for timestamp and routing node message interceptors
to be configured with the overwrite boolean() to avoid defining
multiple default values throughout the code.
2. Add type specs
3. Extend existing test case for new MQTT client ID interceptor
4. routing node and timestamp should only set the annotation for
incoming_message_interceptors group
5. Fix `rabbitmq.conf`.
Prior to this commit there were several issue:
a.) Setting the right configuration was too user unfriendly, e.g. the user has to set
```
message_interceptor.incoming.rabbit_mqtt_message_interceptor_client_id.annotation_key = x-opt-mqtt-client-id
```
just to enable the MQTT message interceptor.
b.) The code that parses was too difficult to understand
c.) MQTT plugin was setting the env for app rabbit, which is an anti-pattern
d.) disabling a plugin (e.g. MQTT), left its message interceptors still in place
This is now all fixed, the user sets the rabbitmq.conf as follows:
```
message_interceptors.incoming.set_header_timestamp.overwrite = true
message_interceptors.incoming.set_header_routing_node.overwrite = false
mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true
```
Note that the first two lines use the same format as for RabbitMQ 4.0
for backwards compatiblity. The last line (MQTT) follows a similar
pattern.
This commit enables users to provide custom message interceptor modules,
i.e. modules to process incoming and outgoing messages. The
`rabbit_message_interceptor` behaviour defines a `intercept/4` callback,
for those modules to implement.
Co-authored-by: Péter Gömöri <gomoripeti@users.noreply.github.com>
[Why]
The intent is to have it stable and enabled by default for new
deployment in RabbitMQ 4.1.x.
To prepare for this goal, it is time to mark the feature flag as stable
to let us iron out the library and its integration into RabbitMQ.
This is not a commitment at this stage: we will revisit this near the
beginning of the release cycle and commit to it or revert to
experimental.
When debug logging is enabled, we log something at each log level
to test if logs are emitted. I don't think this is particularly useful,
but it's certainly annoying, because I constatnly need to filter
out these logs when searching if any errors happened during tests.
This auth backend behaves the same as the internal backend provided in
the core broker, but it only accepts loopback connections. External
connection attempts will receive an error.
This ensures that quorum_queues shuts down _before_
coordination where khepri run inside.
Quorum queues depend on khepri so need to be shut down first.
[Why]
The feature flag enable function is called during the initial migration
or when a node is later added to a cluster.
In this latter situation, the cluster is already formed and the Mnesia
tables were already migrated. Syncing the cluster in this specific
situation might kick another node that is currently unreachable.
[How]
If the node running the enable function is already clustered, we skip
the cluster sync.
[Why]
All callers of `khepri_adv` and `khepri_tx_adv` need updates to handle
the now uniform return type of `khepri:node_props_map()` in Khepri
0.17.0.
[How]
We don't need any compatibility code to handle "either the old return
type or the new return type" from the khepri_adv API because the
translation is done entirely in the "client side" code in Khepri -
meaning that the return value from the Ra server is the same but it is
translated differently by the functions in `khepri_adv`.
However, we need to adapt transaction functions because they may be
executed on different versions of Khepri and the behaviour of
`khepri_tx_adv` can be different. To take the possible change of return
value format, we use the new `khepri_tx:does_api_comply_with/1` to know
what to expect.
[Why]
In Khepri 0.17.0, `khepri_cluster:locally_known_members/1` and
`khepri_cluster:locally_known_node/1` were replaced with
`khepri_cluster:members/2` and `khepri_cluster:nodes/2` with `favor` set
to `low_latency` - this matches the interface for queries in Khepri.
Move leader repair earlier in tick function to ensure more
timely update of meta data store record after leader change.
Also use RPC_TIMEOUT macro for metric/stats multicalls to improve
liveness when a node is connected but partitioned / frozen.
## What?
This commit determines the queue topology without checking the queue type.
## Why?
This way, checking leader and replicas works the same across all queue
types without the need to introduce other rabbit_queue_type behaviour as
suggested in other PRs.
## How?
pid is the leader, nodes in queue_type_states are the members/replicas.
This commit results in an unknown stream leader during queue
declaration. However the correct leader will be returned eventually when
calling GET on the stream.
The same connection can contain several consumers belonging to a SAC
group (group key = vhost + stream + consumer name). The whole new group
must be re-evaluated to select a new active consumer after the consumers
of the down connection are removed from it.
The previous behavior would not re-evaluate the new group and could
select a consumer from the down connection, letting the group with only
inactive consumers, as the selected active consumer would never receive
the activation message from the stream SAC coordinator.
This commit fixes this problem by removing the consumers of the down
down connection from the affected groups and then performing the
appropriate operations for the groups to keep on consuming (e.g.
notifying an active consumer that it needs to step down).
References #13372
Ra improvements:
* Don't allow a non-voter to start elections
* Register with ra directory before initialising ra server.
* Trigger tick_timeout immediately after entering leader state.
* Set a configurable segment max size
This commit also includes a change to turn the quorum queue
become leader callback to become a noop and instead rely on
the more promptly tick_handler to handle the meta data store
update after a leader election.
This more prompt tick update means there should be a much shorter
gap between the queue metrics being deleted from the old leader
node to them being available again on the new node resulting
in smoother message count metrics.
Fix test that relied on waiting on too simplistic a property
before asserting.
Prior to this commit, when a client consumed from an unavailable quorum
queue, the following crash occurred:
```
{badmatch,{error,noproc}}
[{rabbit_quorum_queue,consume,3,[{file,\"rabbit_quorum_queue.erl\"},{line,993}]}
```
This commit fixes this bug by returning any error when registering a
quorum queue consumer to rabbit_queue_type.
This commit also refactors errors returned by
rabbit_queue_type:consume/3 to simplify and ensure seperation of
concerns.
For example prior to this commit, the channel did error
formatting specifically for consuming from streams. It's better if
the channel is unaware of what queue type it consumes from and have each
queue type implementation format their own errors.
Bump the timeout for management operations and link attachments from 20s
to 30s. We've seen timeouts in CI.
We bump the poll interval of the `?awaitMatch` macro because CI
sometimes flaked by crashing in
0e803de6dd/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl (L411)
which indicates that the client lib received a response from a previous
request.