This configuration is not guaranteed to be safe to change after a stream has bee n
declared and thus we'll remove the ability to change it after the initial
declaration. Users should favour the x- queue arg for this config but it will still
be possible to configure it as a policy but it will only be evaluated at
declara tion time.
This means that if a policy is set for a stream that re-configures the
`stream-m ax-segment-size-bytes` key it will show in the UI as updated but
the pre-existing stream will not use the updated configuration.
The key has been removed from the UI but for backwards compatibility it is still
settable.
NB: this PR adds a new command `update_config` to the stream coordinator state
machine. Strictly speaking this should require a new machine version but we're by
passing that by relying on the feature flag instead which avoids this command
being committed before all nodes have the new code version. A new machine version
can lower the availability properties during a rolling cluster upgrade so in
this case it is preferable to avoid that given the simplicity of the change.
Per discussion in #10415, this introduces a new module,
rabbit_mgmt_nodes, which provides a couple of helpers
that can be used to implement Cowboy REST's
resource_exists/2 in the modules that return
information about cluster members.
`rabbit_mgmt_util:direct_request/6` is always called with an
`ErrorMsg` which expects one format argument as a string. Convert the
arbitrary reason term into a string to avoid a crash like the below:
```
warning: FORMATTER CRASH: {"Delete exchange error: ~ts",[{'EXIT',{{badmatch,{error,...
```
This prevents the below harmless crash when multiple parallel API
requests arrive soon after starting the node.
```
exception error: no match of right hand side value
{error,{already_started,<0.1593.0>}}
in function rabbit_mgmt_db_cache:fetch/4 (rabbit_mgmt_db_cache.erl, line 68)
in call from rabbit_mgmt_db:submit_cached/4 (rabbit_mgmt_db.erl, line 756)
in call from rabbit_mgmt_util:augment/2 (rabbit_mgmt_util.erl, line 412)
in call from rabbit_mgmt_util:run_augmentation/2 (rabbit_mgmt_util.erl, line 389)
in call from rabbit_mgmt_util:augment_resources0/6 (rabbit_mgmt_util.erl, line 378)
in call from rabbit_mgmt_util:with_valid_pagination/3 (rabbit_mgmt_util.erl, line 302)
in call from rabbit_mgmt_wm_queues:to_json/2 (rabbit_mgmt_wm_queues.erl, line 44)
in call from cowboy_rest:call/3 (src/cowboy_rest.erl, line 1583)
```
Avoid the following crash
```
** Reason for termination ==
** {mqtt_unexpected_cast,{shutdown,"Closed via management plugin"}}
crasher:
initial call: rabbit_mqtt_reader:init/1
pid: <0.1096.0>
registered_name: []
exception exit: {mqtt_unexpected_cast,
{shutdown,"Closed via management plugin"}}
in function gen_server:handle_common_reply/8 (gen_server.erl, line 1208)
```
when closing MQTT or Stream connections via HTTP API endpoint
```
/connections/username/:username
```
Scan queues, exchanges and bindings before attempting
to import anything on boot. If they miss the virtual
host field, fail early and log a sensible message.
Listing queues with the HTTP API when there are many (1000s) of
quorum queues could be excessively slow compared to the same scenario
with classic queues.
This optimises various aspects of HTTP API queue listings.
For QQs it removes the expensive cluster wide rpcs used to get the
"online" status of each quorum queue. This was previously done _before_
paging and thus would perform a cluster-wide query for _each_ quorum queue in
the vhost/system. This accounted for most of the slowness compared to
classic queues.
Secondly the query to separate the running from the down queues
consisted of two separate queries that later were combined when a single
query would have sufficed.
This commit also includes a variety of other improvements and minor
fixes discovered during testing and optimisation.
MINOR BREAKING CHANGE: quorum queues would previously only display one
of two states: running or down. Now there is a new state called minority
which is emitted when the queue has at least one member running but
cannot commit entries due to lack of quorum.
Also the quorum queue may transiently enter the down state when a node
goes down and before its elected a new leader.
Introduce GET /api/queues/detailed endpoint
Just removed garbage_collection, idle_since and any 'null' value
/api/queues with 10k classic queues returns 7.4MB of data
/api/queues/detailed with 10k classic queues returns 11MB of data
This sits behind a new feature flag, required to collect data from
all nodes: detailed_queues_endpoint
The default is 20 MiB, which is enough to upload
a definition file with 200K queues, a few virtual host
and a few users. In other words, it should accomodate
a lot of environments.
Because both `add_member` and `grow` default to Membership status `promotable`,
new members will have to catch up before they are considered cluster members.
This can be overridden with either `voter` or (permanent `non_voter` statuses.
The latter one is useless without additional tooling so kept undocumented.
- non-voters do not affect quorum size for election purposes
- `observer_cli` reports their status with lowercase 'f'
- `rabbitmq-queues check_if_node_is_quorum_critical` takes voter status into
account
A previous PR removed backing_queue_status as it is mostly unused,
but classic queue version is still useful. This PR returns version
as a top-level key in queue objects.
[Why]
Mnesia is a very powerful and convenient tool for Erlang applications:
it is a persistent disc-based database, it handles replication accross
multiple Erlang nodes and it is available out-of-the-box from the
Erlang/OTP distribution. RabbitMQ relies on Mnesia to manage all its
metadata:
* virtual hosts' properties
* intenal users
* queue, exchange and binding declarations (not queues data)
* runtime parameters and policies
* ...
Unfortunately Mnesia makes it difficult to handle network partition and,
as a consequence, the merge conflicts between Erlang nodes once the
network partition is resolved. RabbitMQ provides several partition
handling strategies but they are not bullet-proof. Users still hit
situations where it is a pain to repair a cluster following a network
partition.
[How]
@kjnilsson created Ra [1], a Raft consensus library that RabbitMQ
already uses successfully to implement quorum queues and streams for
instance. Those queues do not suffer from network partitions.
We created Khepri [2], a new persistent and replicated database engine
based on Ra and we want to use it in place of Mnesia in RabbitMQ to
solve the problems with network partitions.
This patch integrates Khepri as an experimental feature. When enabled,
RabbitMQ will store all its metadata in Khepri instead of Mnesia.
This change comes with behavior changes. While Khepri remains disabled,
you should see no changes to the behavior of RabbitMQ. If there are
changes, it is a bug. After Khepri is enabled, there are significant
changes of behavior that you should be aware of.
Because it is based on the Raft consensus algorithm, when there is a
network partition, only the cluster members that are in the partition
with at least `(Number of nodes in the cluster ÷ 2) + 1` number of nodes
can "make progress". In other words, only those nodes may write to the
Khepri database and read from the database and expect a consistent
result.
For instance in a cluster of 5 RabbitMQ nodes:
* If there are two partitions, one with 3 nodes, one with 2 nodes, only
the group of 3 nodes will be able to write to the database.
* If there are three partitions, two with 2 nodes, one with 1 node, none
of the group can write to the database.
Because the Khepri database will be used for all kind of metadata, it
means that RabbitMQ nodes that can't write to the database will be
unable to perform some operations. A list of operations and what to
expect is documented in the associated pull request and the RabbitMQ
website.
This requirement from Raft also affects the startup of RabbitMQ nodes in
a cluster. Indeed, at least a quorum number of nodes must be started at
once to allow nodes to become ready.
To enable Khepri, you need to enable the `khepri_db` feature flag:
rabbitmqctl enable_feature_flag khepri_db
When the `khepri_db` feature flag is enabled, the migration code
performs the following two tasks:
1. It synchronizes the Khepri cluster membership from the Mnesia
cluster. It uses `mnesia_to_khepri:sync_cluster_membership/1` from
the `khepri_mnesia_migration` application [3].
2. It copies data from relevant Mnesia tables to Khepri, doing some
conversion if necessary on the way. Again, it uses
`mnesia_to_khepri:copy_tables/4` from `khepri_mnesia_migration` to do
it.
This can be performed on a running standalone RabbitMQ node or cluster.
Data will be migrated from Mnesia to Khepri without any service
interruption. Note that during the migration, the performance may
decrease and the memory footprint may go up.
Because this feature flag is considered experimental, it is not enabled
by default even on a brand new RabbitMQ deployment.
More about the implementation details below:
In the past months, all accesses to Mnesia were isolated in a collection
of `rabbit_db*` modules. This is where the integration of Khepri mostly
takes place: we use a function called `rabbit_khepri:handle_fallback/1`
which selects the database and perform the query or the transaction.
Here is an example from `rabbit_db_vhost`:
* Up until RabbitMQ 3.12.x:
get(VHostName) when is_binary(VHostName) ->
get_in_mnesia(VHostName).
* Starting with RabbitMQ 3.13.0:
get(VHostName) when is_binary(VHostName) ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> get_in_mnesia(VHostName) end,
khepri => fun() -> get_in_khepri(VHostName) end}).
This `rabbit_khepri:handle_fallback/1` function relies on two things:
1. the fact that the `khepri_db` feature flag is enabled, in which case
it always executes the Khepri-based variant.
4. the ability or not to read and write to Mnesia tables otherwise.
Before the feature flag is enabled, or during the migration, the
function will try to execute the Mnesia-based variant. If it succeeds,
then it returns the result. If it fails because one or more Mnesia
tables can't be used, it restarts from scratch: it means the feature
flag is being enabled and depending on the outcome, either the
Mnesia-based variant will succeed (the feature flag couldn't be enabled)
or the feature flag will be marked as enabled and it will call the
Khepri-based variant. The meat of this function really lives in the
`khepri_mnesia_migration` application [3] and
`rabbit_khepri:handle_fallback/1` is a wrapper on top of it that knows
about the feature flag.
However, some calls to the database do not depend on the existence of
Mnesia tables, such as functions where we need to learn about the
members of a cluster. For those, we can't rely on exceptions from
Mnesia. Therefore, we just look at the state of the feature flag to
determine which database to use. There are two situations though:
* Sometimes, we need the feature flag state query to block because the
function interested in it can't return a valid answer during the
migration. Here is an example:
case rabbit_khepri:is_enabled(RemoteNode) of
true -> can_join_using_khepri(RemoteNode);
false -> can_join_using_mnesia(RemoteNode)
end
* Sometimes, we need the feature flag state query to NOT block (for
instance because it would cause a deadlock). Here is an example:
case rabbit_khepri:get_feature_state() of
enabled -> members_using_khepri();
_ -> members_using_mnesia()
end
Direct accesses to Mnesia still exists. They are limited to code that is
specific to Mnesia such as classic queue mirroring or network partitions
handling strategies.
Now, to discover the Mnesia tables to migrate and how to migrate them,
we use an Erlang module attribute called
`rabbit_mnesia_tables_to_khepri_db` which indicates a list of Mnesia
tables and an associated converter module. Here is an example in the
`rabbitmq_recent_history_exchange` plugin:
-rabbit_mnesia_tables_to_khepri_db(
[{?RH_TABLE, rabbit_db_rh_exchange_m2k_converter}]).
The converter module — `rabbit_db_rh_exchange_m2k_converter` in this
example — is is fact a "sub" converter module called but
`rabbit_db_m2k_converter`. See the documentation of a `mnesia_to_khepri`
converter module to learn more about these modules.
[1] https://github.com/rabbitmq/ra
[2] https://github.com/rabbitmq/khepri
[3] https://github.com/rabbitmq/khepri_mnesia_migration
See #7206.
Co-authored-by: Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com>
Co-authored-by: Diana Parra Corbacho <dparracorbac@vmware.com>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Using GET /api/queues?disable_stats=true&enable_queue_totals=true is far more efficient than the standard GET /api/queues and in many cases will suffice for monitoring and operating purposes.
- Separate pure management ui suites from authnz
- Run full management ui suite on every commit to main or
release brances
- Fun full management ui suite on every change done to
rabbitmq_management plugin on any PR
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.
Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.
This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.
The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.
This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.
COMMIT HISTORY:
* Refactor away from using the delivery{} record
In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.
Add mc module and move direct replies outside of exchange
Lots of changes incl classic queues
Implement stream support incl amqp conversions
simplify mc state record
move mc.erl
mc dlx stuff
recent history exchange
Make tracking work
But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.
Tracing as a whole may want a bit of a re-vamp at some point.
tidy
make quorum queue peek work by legacy conversion
dead lettering fixes
dead lettering fixes
CMQ fixes
rabbit_trace type fixes
fixes
fix
Fix classic queue props
test assertion fix
feature flag and backwards compat
Enable message_container feature flag in some SUITEs
Dialyzer fixes
fixes
fix
test fixes
Various
Manually update a gazelle generated file
until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185
Add message_containers_SUITE to bazel
and regen bazel files with gazelle from rules_erlang@main
Simplify essential proprty access
Such as durable, ttl and priority by extracting them into annotations
at message container init time.
Move type
to remove dependenc on amqp10 stuff in mc.erl
mostly because I don't know how to make bazel do the right thing
add more stuff
Refine routing header stuff
wip
Cosmetics
Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.
* Dedup death queue names
* Fix function clause crashes
Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)
Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.
* Fix is_utf8_no_null crash
Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```
* Implement mqtt mc behaviour
For now via amqp translation.
This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```
* Shorten mc file names
Module name length matters because for each persistent message the #mc{}
record is persisted to disk.
```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```
This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```
* mc: make deaths an annotation + fixes
* Fix mc_mqtt protocol_state callback
* Fix test will_delay_node_restart
```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```
* Bazel run gazelle
* mix format rabbitmqctl.ex
* Ensure ttl annotation is refelected in amqp legacy protocol state
* Fix id access in message store
* Fix rabbit_message_interceptor_SUITE
* dializer fixes
* Fix rabbit:rabbit_message_interceptor_SUITE-mixed
set_annotation/3 should not result in duplicate keys
* Fix MQTT shared_SUITE-mixed
Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.
The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.
* Field content of 'v1_0.data' can be binary
Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
--test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
-t- --test_sharding_strategy=disabled
```
* Remove route/2 and implement route/3 for all exchange types.
This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.
stream filtering fixes
* Translate directly from MQTT to AMQP 0.9.1
* handle undecoded properties in mc_compat
amqpl: put clause in right order
recover death deatails from amqp data
* Replace callback init_amqp with convert_from
* Fix return value of lists:keyfind/3
* Translate directly from AMQP 0.9.1 to MQTT
* Fix MQTT payload size
MQTT payload can be a list when converted from AMQP 0.9.1 for example
First conversions tests
Plus some other conversion related fixes.
bazel
bazel
translate amqp 1.0 null to undefined
mc: property/2 and correlation_id/message_id return type tagged values.
To ensure we can support a variety of types better.
The type type tags are AMQP 1.0 flavoured.
fix death recovery
mc_mqtt: impl new api
Add callbacks to allow protocols to compact data before storage
And make readable if needing to query things repeatedly.
bazel fix
* more decoding
* tracking mixed versions compat
* mc: flip default of `durable` annotation to save some data.
Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.
* mc conversion tests and tidy up
* mc make x_header unstrict again
* amqpl: death record fixes
* bazel
* amqp -> amqpl conversion test
* Fix crash in mc_amqp:size/1
Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.
* Fix crash in lists:flatten/1
Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.
* Fix crash in rabbit_writer
Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
initial call: rabbit_writer:enter_mainloop/2
pid: <0.711.0>
registered_name: []
exception error: bad argument
in function size/1
called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
*** argument 1: not tuple or binary
in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.
This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.
* Add accidentally deleted line back
* mc: optimise mc_amqp internal format
By removint the outer records for message and delivery annotations
as well as application properties and footers.
* mc: optimis mc_amqp map_add by using upsert
* mc: refactoring and bug fixes
* mc_SUITE routingheader assertions
* mc remove serialize/1 callback as only used by amqp
* mc_amqp: avoid returning a nested list from protocol_state
* test and bug fix
* move infer_type to mc_util
* mc fixes and additiona assertions
* Support headers exchange routing for MQTT messages
When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).
This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.
When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.
* Fix crash when sending from stream to amqpl
When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
initial call: rabbit_channel:init/1
pid: <0.818.0>
registered_name: []
exception exit: {{badmatch,undefined},
[{rabbit_channel,handle_deliver0,4,
[{file,"rabbit_channel.erl"},
{line,2728}]},
{lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
{rabbit_channel,handle_cast,2,
[{file,"rabbit_channel.erl"},
{line,728}]},
{gen_server2,handle_msg,2,
[{file,"gen_server2.erl"},{line,1056}]},
{proc_lib,wake_up,3,
[{file,"proc_lib.erl"},{line,251}]}]}
```
This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.
* Support consistent hash exchange routing for MQTT 5.0
When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.
* Convert MQTT 5.0 User Property
* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations
* Make use of Annotations in mc_mqtt:protocol_state/2
mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.
* Enforce AMQP 0.9.1 field name length limit
The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.
* Fix type specs
Apply feedback from Michael Davis
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* Add mc_mqtt unit test suite
Implement mc_mqtt:x_header/2
* Translate indicator that payload is UTF-8 encoded
when converting between MQTT 5.0 and AMQP 1.0
* Translate single amqp-value section from AMQP 1.0 to MQTT
Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.
If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.
This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.
* Fix payload conversion
* Translate Response Topic between MQTT and AMQP
Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.
The Response Topic must be a UTF-8 encoded string.
This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/" RK Publish to amq.topic with routing key RK
"/exchange/" X "/" RK Publish to exchange X with routing key RK
```
By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.
When an operator modifies the mqtt.exchange, the 2nd target address is
used.
* Apply PR feedback
and fix formatting
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* tidy up
* Add MQTT message_containers test
* consistent hash exchange: avoid amqp legacy conversion
When hashing on a header value.
* Avoid converting to amqp legacy when using exchange federation
* Fix test flake
* test and dialyzer fixes
* dialyzer fix
* Add MQTT protocol interoperability tests
Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams
* Regenerate portions of deps/rabbit/app.bzl with gazelle
I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.
* mc: refactoring
* mc_amqpl: handle delivery annotations
Just in case they are included.
Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.
---------
Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
[Why]
Management metrics collection will be removed in RabbitMQ 4.0. The
prometheus plugin provides a better and more scalable alternative.
[How]
The management metrics collection is marked as deprecated in the code
using the Deprecated features subsystem (based on feature flags). See
pull request #7390 for a description of that subsystem.
To test RabbitMQ behavior as if the feature was removed, the following
configuration setting can be used:
deprecated_features.permit.management_metrics_collection = false
Management metrics collection can be turned off anytime, there are no
conditions to do that.
Once management metrics collection is turned off, the management API
will not report any metrics and the UI will show empty graphs.
Note that given the marketing calendar, the deprecated feature will go
directly from "permitted by default" to "removed" in RabbitMQ 4.0. It
won't go through the gradual deprecation process.
Bazel build files are now maintained primarily with `bazel run
gazelle`. This will analyze and merge changes into the build files as
necessitated by certain code changes (e.g. the introduction of new
modules).
In some cases there hints to gazelle in the build files, such as `#
gazelle:erlang...` or `# keep` comments. xref checks on plugins that
depend on the cli are a good example.
This reverts commit 753fa5a191.
Both rabbit_mgmt_oauth_bootstrap and rabbit_mgmt_wm_auth should
be kept for backwards compatibility with certain clients.
rabbit_mgmt_oauth_bootstrap is not hooked up to the dispatcher,
and appears to be an older version of what is now rabbit_mgmt_wm_auth
(cherry picked from commit 1209b86671)
vhost_precondition_failed => vhost_limit_exceeded
vhost_limit_exceeded is the error type used by
definition import when a per-vhost is exceeded.
It feels appropriate for this case, too.
Rather than relying on queue name conventions, allow applying policies
based on the queue type. For example, this allows multiple policies that
apply to all queue names (".*") that specify different parameters for
different queue types.
Allow list of preferred_username_claims in cuttlefish
config style.
Use new config style on two selenium test suites
Test oauth2 backend's config schema and oauth2 management
config schema
This new module sits on top of `rabbit_mnesia` and provide an API with
all cluster-related functions.
`rabbit_mnesia` should be called directly inside Mnesia-specific code
only, `rabbit_mnesia_rename` or classic mirrored queues for instance.
Otherwise, `rabbit_db_cluster` must be used.
Several modules, in particular in `rabbitmq_cli`, continue to call
`rabbit_mnesia` as a fallback option if the `rabbit_db_cluster` module
unavailable. This will be the case when the CLI will interact with an
older RabbitMQ version.
This will help with the introduction of a new database backend.
So far, we had the following functions to list nodes in a RabbitMQ
cluster:
* `rabbit_mnesia:cluster_nodes/1` to get members of the Mnesia cluster;
the argument was used to select members (all members or only those
running Mnesia and participating in the cluster)
* `rabbit_nodes:all/0` to get all members of the Mnesia cluster
* `rabbit_nodes:all_running/0` to get all members who currently run
Mnesia
Basically:
* `rabbit_nodes:all/0` calls `rabbit_mnesia:cluster_nodes(all)`
* `rabbit_nodes:all_running/0` calls `rabbit_mnesia:cluster_nodes(running)`
We also have:
* `rabbit_node_monitor:alive_nodes/1` which filters the given list of
nodes to only select those currently running Mnesia
* `rabbit_node_monitor:alive_rabbit_nodes/1` which filters the given
list of nodes to only select those currently running RabbitMQ
Most of the code uses `rabbit_mnesia:cluster_nodes/1` or the
`rabbit_nodes:all*/0` functions. `rabbit_mnesia:cluster_nodes(running)`
or `rabbit_nodes:all_running/0` is often used as a close approximation
of "all cluster members running RabbitMQ". This list might be incorrect
in times where a node is joining the clustered or is being worked on
(i.e. Mnesia is running but not RabbitMQ).
With Khepri, there won't be the same possible approximation because we
will try to keep Khepri/Ra running even if RabbitMQ is stopped to
expand/shrink the cluster.
So in order to clarify what we want when we query a list of nodes, this
patch introduces the following functions:
* `rabbit_nodes:list_members/0` to get all cluster members, regardless
of their state
* `rabbit_nodes:list_reachable/0` to get all cluster members we can
reach using Erlang distribution, regardless of the state of RabbitMQ
* `rabbit_nodes:list_running/0` to get all cluster members who run
RabbitMQ, regardless of the maintenance state
* `rabbit_nodes:list_serving/0` to get all cluster members who run
RabbitMQ and are accepting clients
In addition to the list functions, there are the corresponding
`rabbit_nodes:is_*(Node)` checks and `rabbit_nodes:filter_*(Nodes)`
filtering functions.
The code is modified to use these new functions. One possible
significant change is that the new list functions will perform RPC calls
to query the nodes' state, unlike `rabbit_mnesia:cluster_nodes(running)`.
This commit is pure refactoring making the code base more maintainable.
Replace rabbit_misc:pipeline/3 with the new OTP 25 experimental maybe
expression because
"Frequent ways in which people work with sequences of failable
operations include folds over lists of functions, and abusing list
comprehensions. Both patterns have heavy weaknesses that makes them less
than ideal."
https://www.erlang.org/eeps/eep-0049#obsoleting-messy-patterns
Additionally, this commit is more restrictive in the type spec of
rabbit_mqtt_processor state fields.
Specifically, many fields were defined to be `undefined | T` where
`undefined` was only temporarily until the first CONNECT packet was
processed by the processor.
It's better to initialise the MQTT processor upon first CONNECT packet
because there is no point in having a processor without having received
any packet.
This allows many type specs in the processor to change from `undefined |
T` to just `T`.
Additionally, memory is saved by removing the `received_connect_packet`
field from the `rabbit_mqtt_reader` and `rabbit_web_mqtt_handler`.
- Use the same base .plt everywhere, so there is no need to list
standard apps everywhere
- Fix typespecs: some typos and the use of not-exported types
The issue is that users retrieved with
the intention to list in the limits view
are not paged hence they are not wrapped
around a paging struct where users would be
under items attribute.
Pending selenium tests
as it was unnecessary to introduce it in the first place.
Remove the queue name from all queue type clients and pass the queue
name to the queue type callbacks that need it.
We have to leave feature flag classic_queue_type_delivery_support
required because we removed the monitor registry
1fd4a6d353/deps/rabbit/src/rabbit_queue_type.erl (L322-L325)
Implements review from Karl:
"rather than changing the message format we could amend the queue type
callbacks involved with the stateful operation to also take the queue
name record as an argument. This way we don't need to maintain the extra
queue name (which uses memory for known but obscurely technical reasons
with how maps work) in the queue type state (as it is used in the queue
type state map as the key)"
1. Allow to inspect an (web) MQTT connection.
2. Show MQTT client ID on connection page as part of client_properties.
3. Handle force_event_refresh (when management_plugin gets enabled
after (web) MQTT connections got created).
4. Reduce code duplication between protocol readers.
5. Display '?' instead of 'NaN' in UI for absent queue metrics.
6. Allow an (web) MQTT connection to be closed via management_plugin.
For 6. this commit takes the same approach as already done for the stream
plugin:
The stream plugin registers neither with {type, network} nor {type,
direct}.
We cannot use gen_server:call/3 anymore to close the connection
because the web MQTT connection cannot handle gen_server calls (only
casts).
Strictly speaking, this commit requires a feature flag to allow to force
closing stream connections from the management plugin during a rolling
update. However, given that this is rather an edge case, and there is a
workaround (connect to the node directly hosting the stream connection),
this commit will not introduce a new feature flag.
This allows us to stop ignorning undefined callback warnings
When mix compiles rabbitmqctl, it produces a 'consolidated' directory
alongside the 'ebin' dir. Some of the modules in consolidated are
intended to be used instead of those provided by elixir. We now handle
the conflicts properly in the bazel build.
So far, `rabbit_runtime_parameters` could be called with invalid terms.
There are probably still places in the code which lack this kind of
verification.
`rabbit_definitions:apply_defs/5` contains a bug
where it was trying to import bindings too
early (aec3dcbf37/deps/rabbit/src/rabbit_definitions.erl (L480)).
`apply_defs/4` imports exchanges at exactly the same place.
Turns out `apply_defs/5` is almost an verbatim copy of
`apply_defs/4`, just with custom error handling - and it isn't being
used anywhere in the source code.
Also removes related unused exports.
Default queue type logic didn't apply when import was done for a
single vhost. Exported definitions for a single vhost don't contain
`vhost` key, and anyway it's better to use the actual vhost name that
will be used for queue declaration.
And just in case, `vhost` is now stripped from all exported items for
a single vhost (it was still present in `parameters` and `policies`) -
on import it was ignored, but it could have been a source of the bug
similar to the one with the default queue type.
1. Allow to create queues without `x-queue-type` argument, which give
default queue type logic a chance to run. What's more, those queues
definitions will be exported without `x-queue-type`, so they can be
loaded into another vhost and default queue logic will be applied
again.
2. Show default queue type on the vhost page and the vhosts list pages
Discovered by @dumbbell
Ensure externally read strings are saved as utf-8 encoded binaries. This
is necessary since `cmd.exe` on Windows uses ISO-8859-1 encoding and
directories can have latin1 characters, like `RabbitMQ Sérvér`.
The `é` is represented by decimal `233` in the ISO-8859-1 encoding. The
unicode code point is the same decimal value, `233`, so you will see
this in the charlist data. However, when encoded using utf-8, this
becomes the two-byte sequence `C3 A9` (hexidecimal).
When reading strings from env variables and configuration, they will be
unicode charlists, with each list item representing a unicode code
point. All of Erlang string functions can handle strings in this form.
Once these strings are written to ETS or Mnesia, they will be converted
to utf-8 encoded binaries. Prior to these changes just
`list_to_binary/1` was used.
Fix xref error
re:replace requires an iodata, which is not a list of unicode code points
Correctly parse unicode vhost tags
Fix many format strings to account for utf8 input. Try again to fix unicode vhost tags
More format string fixes, try to get the CONFIG_FILE var correct
Be sure to use the `unicode` option for re:replace when necessary
More unicode format strings, add unicode option to re:split
More format strings updated
Change ~s to ~ts for vhost format strings
Change ~s to ~ts for more vhost format strings
Change ~s to ~ts for more vhost format strings
Add unicode format chars to disk monitor
Quote the directory on unix
Finally figure out the correct way to pass unicode to the port
The issue was primarily that UAA was
not properly configured. We had to whitelist
the uri used for logout otherwise UAA redirects
to its login page
WIP verify that logout.js works when running in
headless mode. For that we need a docker image
and at the moment, make docker-image is not
working because it is still using old otp 24.0.2
This is to avoid the authorization page in
UAA. It is not clear how to remove approvals therefore
once they are approved the user has the scopes approved
until they expire. If the user has some approved scopes,
they will not be prompted to approve them until they
expire hence the ui flow differs when the user has to
approve scopes and when not.