Prior to this commit, when the sending client overshot RabbitMQ's incoming-window
(which is allowed in the event of a cluster wide memory or disk alarm),
and RabbitMQ sent a FLOW frame to the client, RabbitMQ sent a negative
incoming-window field in the FLOW frame causing the following crash in
the writer proc:
```
crasher:
initial call: rabbit_amqp_writer:init/1
pid: <0.19353.0>
registered_name: []
exception error: bad argument
in function iolist_size/1
called as iolist_size([<<112,0,0,23,120>>,
[82,-15],
<<"pÿÿÿü">>,<<"pÿÿÿÿ">>,67,
<<112,0,0,23,120>>,
"Rª",64,64,64,64])
*** argument 1: not an iodata term
in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 141)
in call from amqp10_binary_generator:generate1/1 (amqp10_binary_generator.erl, line 88)
in call from amqp10_binary_generator:generate/1 (amqp10_binary_generator.erl, line 79)
in call from rabbit_amqp_writer:assemble_frame/3 (rabbit_amqp_writer.erl, line 206)
in call from rabbit_amqp_writer:internal_send_command_async/3 (rabbit_amqp_writer.erl, line 189)
in call from rabbit_amqp_writer:handle_cast/2 (rabbit_amqp_writer.erl, line 110)
in call from gen_server:try_handle_cast/3 (gen_server.erl, line 1121)
```
This commit fixes this crash by maintaning a floor of zero for
incoming-window in the FLOW frame.
Fixes#12816
The credit_flow between publishing AMQP 0.9.1 channel (or MQTT
connection) and (non-mirrored) classic queue processes was
unintentionally removed in 4.0 together with anything else related to
CQ mirroring.
By default we restore the 3.x behaviour for non-mirored classic
queues. It is possible to disable flow-control (the earlier 4.0.x
behaviour) with the new env `classic_queue_flow_control`. In 3.x this
was possible with the config `mirroring_flow_control`.
(cherry picked from commit d65bd7d07a)
[Why]
The code assumed that the transaction would always succeed. It was kind
of the case with Mnesia because it would throw an exception if it
failed.
Khepri returns an error instead. The code has to handle it. In
particular, we see timeouts in CI and before this patch, they caused a
crash because the list comprehension was asked to work on a tuple.
[How]
We now retry a few times for 10 seconds.
[Why]
We pin a version of Horus even if we don't use it directly (it is a
dependency of Khepri). But currently, we can't update Khepri while still
needing the fix in Horus 0.3.1.
Horus 0.3.1 works around a crash in `cover` that mostly affects CI for
now.
This pinning will have to go away with the next update of Khepri.
[Why]
The `ra:member_add/3` call returns before the change is committed. This
is ok for that addition but any follow-up changes to the cluster might
be rejected with the `cluster_change_not_permitted` error.
[How]
Instead of changing other places to wait or retry their cluster
membership change, this patch waits for the current add to be applied
before proceeding and returning.
This fixes some transient failures in CI where such follow-up changes
are rejected and not retried, leaving the cluster in an unexpected state
for the testcase.
An example is with
`quorum_queue_SUITE:force_shrink_member_to_current_member/1`
This check fails on a virin node, because the metadata store
is not yet ready to handle the query. However, a virin
node by definition can't have any queues, so let's just return
false without asking.
[Why]
In CI, we observe some timeouts in the Erlang distribution connections
between the temporary hidden node and the nodes it queries. This affects
peer discovery obviously.
[How]
We introduce some query retries to reduce the risk of an incomplete
query.
While here, we move the sorting of queried nodes from the
`query_node_props2/3` last clause (executed in the temporary hidden
node) to the function setting the temporary hidden node and asking for
these queries. This way the debug messages from that sorting are logged
by RabbitMQ out of the box.
[Why]
This impacts what is reported by the catch because it caught exceptions
emitted by code supposedly called later. An example is the assert
in `query_node_props2/3` last clause.
[Why]
This was the first solution put in place to prevent that the temporary
hidden node connects to the node that started it to write any printed
messages. Because of this, the nodes that the temporary hidden node
queried found out about the parent node and they opened an Erlang
distribution connection to it. This polluted the known nodes list.
However later, the temporary hidden node was started with the
`standard_io` connection option. This prevented the temporary hidden
node from knowing about the node that started it, solving the problem in
a cleaner way.
[How]
This commit garbage-collects that piece of code that is now useless. It
makes the query code way simpler to understand.
[Why]
That timer was started during boot and continued regardless if `rabbit`
was running or stopped.
This caused the reconsiliation to crash if the `rabbit` app was stopped
before the it ended because it tried to access the database even though
it was stopped or even reset.
[How]
We just check if `rabbit` is running before running one reconciliation
and scheduling a new one.
This fixes erlang_ls's header resolution. Previously it would confuse
the include_lib of the `khepri.hrl` from Khepri with this header in
the rabbit app.
This header is also specific to how rabbit uses Khepri so I think the
new name fits better.
Introduce a single place in the AMQP 1.0 Erlang client that infers the AMQP 1.0 type.
Erlang integers are inferred to be AMQP type `long` to avoid overflow surprises.
We don't expect random bytes to be there in the current
version of the message store as we overwrite empty spaces
with zeroes when moving messages around.
We also don't expect messages to be false flagged when
the broker is running because it checks for message
validity in the index. Therefore make sure message bodies
in the tests don't contain byte 255.
## What?
Prior to this commit, the `rabbitmq_event_exchange` internally published
always AMQP 0.9.1 messages to the `amq.rabbitmq.event` topic exchange.
This commit allows users to configure the plugin to publish AMQP 1.0
messages instead.
## Why?
Prior to this commit, when an AMQP 1.0 client consumed events,
event properties that are lists were omitted. For example property
`client_properties` of event `connection.created` or property
`arguments` of event `queue.created` were omitted because of the following sequence:
1. The event exchange plugins listens for all kind of internal events.
2. The event exchange plugin re-publishes all events as AMQP 0.9.1 message to the event exchange.
3. Later, when an AMQP 1.0 client consumes this message, the broker must translate the message from AMQP 0.9.1 to AMQP 1.0.
4. This translation follows the rules outlined in https://www.rabbitmq.com/docs/conversions#amqpl-amqp
5. Specifically, in this table the row before the last one describes the rule we're hitting here. It says that if the AMQP 0.9.1
header value is not an `x-` prefixed header and its value is an array or table, then this header is not converted.
That's because AMQP 1.0 application-properties must be simple types as mandated in https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application-properties
## How?
The user can configure the plugin as follows to have the plugin
internally publish AMQP 1.0 messages:
```
event_exchange.protocol = amqp_1_0
```
To support complex types such as lists, the plugin sets all event
properties as AMQP 1.0 message-annotations. The plugin prefixes all message
annotation keys with `x-opt-` to comply with the AMQP 1.0 spec.
## Alternative Design
An alternative design would have been to format all event properties
e.g. as JSON within the message body. However, this breaks routing on
specific event property values via a headers exchange.
## Documentation
https://github.com/rabbitmq/rabbitmq-website/pull/2129