Commit Graph

432 Commits

Author SHA1 Message Date
Michael Davis f5805b83d2
Khepri: Handle breaking change in khepri adv API return type
[Why]
All callers of `khepri_adv` and `khepri_tx_adv` need updates to handle
the now uniform return type of `khepri:node_props_map()` in Khepri
0.17.0.

[How]
We don't need any compatibility code to handle "either the old return
type or the new return type" from the khepri_adv API because the
translation is done entirely in the "client side" code in Khepri -
meaning that the return value from the Ra server is the same but it is
translated differently by the functions in `khepri_adv`.

However, we need to adapt transaction functions because they may be
executed on different versions of Khepri and the behaviour of
`khepri_tx_adv` can be different. To take the possible change of return
value format, we use the new `khepri_tx:does_api_comply_with/1` to know
what to expect.
2025-04-08 18:47:27 +02:00
Loïc Hoguin c5d150a7ef
Use Erlang.mk's native Elixir support for CLI
This avoids using Mix while compiling which simplifies
a number of things and let us do further build improvements
later on.

Elixir is only enabled from within rabbitmq_cli currently.

Eunit is disabled since there are only Elixir tests.

Dialyzer will force-enable Elixir in order to process
Elixir-compiled beam files.

This commit also includes a few changes that are
related:

 * The Erlang distribution will now be started for parallel-ct

 * Many unnecessary PROJECT_MOD lines have been removed

 * `eunit_formatters` has been removed, it provides little value

 * The new `maybe_flock` Erlang.mk function is used where possible

 * Build test deps when testing rabbitmq_cli (Mix won't do it anymore)

 * rabbitmq_ct_helpers now use the early plugins to have Dialyzer
   properly set up
2025-03-18 10:02:49 +01:00
Aitor Perez 07adc3e571
Remove Bazel files 2025-03-13 13:42:34 +00:00
Michael Klishin 968eefa1bb
Bump (c) line year
There are no functional changes to this massive diff.
2025-01-01 17:54:10 -05:00
David Ansari e2a113605d Disallow transient entities in RabbitMQ AMQP 1.0 Erlang client
Transient (i.e. `durable=false`) exchanges and queues are deprecated.

Khepri will store all entities durably.
(Even exclusive queues will be stored durably. Exclusive queues are
still deleted when the declaring connection is closed.)

Similar to how the RabbitMQ AMQP 1.0 Java client already disallows the
creation of transient exchanges and queues, this commit will prohibit
the declaration of transient exchanges and queues in the RabbitMQ
AMQP 1.0 Erlang client starting with RabbitMQ 4.1.
2024-12-16 16:17:55 +01:00
David Ansari b1eb354385 Strictly validate annotations 2024-09-18 12:42:27 +02:00
Jean-Sébastien Pédron 94b8689284
Reorganize data in the Khepri store
[Why]

The previous layout followed the flat structure we have in Mnesia:
* In Mnesia, we have tables named after each purpose (exchanges, queues,
  runtime parameters and so on).
* In Khepri, we had about the same but the table names were replaced by
  a tree node in the tree. We ended up with one tree node per purpose
  at the root of the tree.

Khepri implements a tree. We could benefit from this and organize data
to reflect their relationship in RabbitMQ.

[How]

Here is the new hierarchy implemented by this commit:

    rabbitmq
    |-- users
    |   `-- $username
    |-- vhosts
    |   `-- $vhost
    |       |-- user_permissions
    |       |   `-- $username
    |       |-- exchanges
    |       |   `-- $exchange
    |       |       |-- bindings
    |       |       |   |-- queue
    |       |       |   |   `-- $queue
    |       |       |   `-- exchange
    |       |       |       `-- $exchange
    |       |       |-- consistent_hash_ring_state
    |       |       |-- jms_topic
    |       |       |-- recent_history
    |       |       |-- serial
    |       |       `-- user_permissions
    |       |           `-- $username
    |       |-- queues
    |       |   `-- $queue
    |       `-- runtime_params
    |           `-- $param_name
    |-- runtime_params
    |   `-- $param_name
    |-- mirrored_supervisors
    |   `-- $group
    |       `-- $id
    `-- node_maintenance
        `-- $node

We first define a root path in `rabbit/include/khepri.hrl` as
`[rabbitmq]`. This could be anything, including an empty path.

All paths are constructed either from this root path definition (users
and vhosts paths do that), or from a parent resource's path (exchanges
and queues paths are based on a vhost path).
2024-09-05 15:31:29 +02:00
Jean-Sébastien Pédron 1383c0c415
rabbt_db: Unify Khepri paths API
[Why]

Currently, `rabbit_db_*` modules use and export the following kind of
functions to return the path to the resources they manage:

    khepri_db_thing:khepri_things_path(),
    khepri_db_thing:khepri_thing_path(Identifier).

Internally, `khepri_db_thing:khepri_thing_path(Identifier)` appends
`Identifier` to the list returned by
`khepri_db_thing:khepri_things_path()`. This works for the organization
of the records we have today in Khepri:

    |-- thing
    |   |-- <<"identifier1">>
    |   |   <<"identifier2">>
    `-- other_thing
	`-- <<"other_identifier1">>

However, with the upcoming organization that leverages the tree in
Khepri, identifiers may be in the middle of the path instead of a leaf
component. We may also put `other_thing` under `thing` in the tree.

That's why, we can't really expose a parent directory for `thing` and
`other_thing`. Therefore, `khepri_db_thing:khepri_things_path/0` needs
to go away. Only `khepri_db_thing:khepri_thing_path/1` should be
exported and used.

In addition to that, there are several places where paths are hard-coded
(i.e. their definition is duplicated).

[How]

The patch does exactly that. Uses of
`khepri_db_thing:khepri_things_path()` are generally replaced by
`rabbit_db_thing:khepri_thing_path(?KHEPRI_WILDCARD_STAR)`.

Places where the path definitions were duplicated are fixed too by
calling the path building functions.

In the future, for a resource that depends on another one, the
corresponding module will call the `rabbit_db_thing:khepri_thing_path/1`
for that other resource and build its path on top of that.
2024-09-05 13:58:04 +02:00
David Ansari cdc5b886f8 Fix crash in consistent hash exchange
Prior to this commit, a crash occurred when a consistent hash exchange
got declared with a `hash-header` argument, but the publishing client
didn't set that header on the message.

This bug is present in RabbitMQ 3.13.0 - 3.13.6.

Fixes https://github.com/rabbitmq/rabbitmq-server/discussions/11671
2024-07-24 11:42:59 +02:00
Michael Davis 95279a31ec
rabbitmq_consistent_hash_exchange: Remove shard count for test suite
Without buildbuddy for parallelization and with the change in the parent
commit to reduce the number of cases, the suite now runs fast enough
that sharding is counterproductive.
2024-07-10 13:46:22 -04:00
Michael Davis da83358a4a
Respect RABBITMQ_METADATA_STORE in consistent hash exchange suite 2024-07-10 13:46:22 -04:00
Artem Bilan 8b7ba05f32
Consistent Hash doc improvement
Mention in the docs that `hash-header` and `hash-property` are mutually exclusive.
2024-07-09 10:47:02 -04:00
Loïc Hoguin bbfa066d79
Cleanup .gitignore files for the monorepo
We don't need to duplicate so many patterns in so many
files since we have a monorepo (and want to keep it).

If I managed to miss something or remove something that
should stay, please put it back. Note that monorepo-wide
patterns should go in the top-level .gitignore file.
Other .gitignore files are for application or folder-
specific patterns.
2024-06-28 12:00:52 +02:00
Loïc Hoguin 9f15e978b1
make: Remove xrefr
It is no longer used by Erlang.mk.
2024-06-25 13:08:08 +02:00
Loïc Hoguin 7e9cac3d00
make: Remove Travis-specific targets/config
This should no longer be used.
2024-06-24 14:12:02 +02:00
Rin Kuryloski 5debebfaf3 Use rules_elixir to build the cli without mix
Certain elixir-native deps are still build with mix, but this can be
corrected later
2024-06-18 14:50:34 +02:00
Michal Kuratczyk cfa3de4b2b
Remove unused imports (thanks elp!) 2024-05-23 16:36:08 +02:00
David Ansari 1d02ea9e55 Fix crashes when message gets dead lettered
Fix crashes when message is originally sent via AMQP and
stored within a classic or quorum queue and subsequently
dead lettered where the dead letter exchange needs access to message
annotations or properties or application-properties.
2024-05-02 07:56:00 +00:00
Rin Kuryloski 6a9d668def Set PLT_APPS in a number of plugins where it was missing 2024-04-29 14:54:28 +02:00
Michael Klishin 9c79ad8d55 More missed license header updates #9969 2024-02-05 12:26:25 -05:00
Michael Klishin f414c2d512
More missed license header updates #9969 2024-02-05 11:53:50 -05:00
David Ansari 6a3ba6210a
Reduce per message disk overhead (#10339)
* Reduce per message disk overhead

Message container annotation keys are stored on disk.
By shortening them we save 95 - 58 = 37 bytes per message.
```
1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})).
95
2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})).
58
```
This should somewhat reduce disk I/O and disk space.

* Ensure durable is a boolean

Prevent key 'durable' with value 'undefined' being added to the
mc annotations, for example when the durable field was not set, but
another AMQP 1.0 header field was set.

* Apply feedback
2024-01-18 11:53:02 +01:00
Michael Klishin 01092ff31f
(c) year bumps 2024-01-01 22:02:20 -05:00
Michael Davis b3313ad21e
Migrate records from mnesia to Khepri with async commands
Using async commands via Khepri's `async` command option - which
corresponds to `ra:pipeline_command/4` - allows Ra to write multiple
commands in a batch. This can significantly boost the migration speed
since Ra can handle more commands per call to fsync and fsync seems to
be the bottleneck during migration.

We refactor the callbacks of the converter modules so that they take the
overall converter module's (`rabbit_db_m2k_converter`) state record.
That state record keeps track of the correlation IDs which are
ultimately passed to `ra:pipeline_command/4` and each converter module
calls `rabbit_db_m2k_converter:with_correlation_id/2` on the state to
get a new correlation ID and add an async request. Each async change is
briefly attached to the state record as a function so that requests can
be retried in the case that the Ra leader changes.
`with_correlation_id/2` also acts as a backpressure mechanism so we
avoid flooding the Ra process with messages.

I've set the maximum for in-flight requests to 64 to start with. The
worker pool for importing definitions has 32 members, so migration tends
to be faster than initial definition import. This could be tuned based
on memory/CPU usage however.
2023-12-12 12:01:59 -05:00
Michael Klishin 1b642353ca
Update (c) according to [1]
1. https://investors.broadcom.com/news-releases/news-release-details/broadcom-and-vmware-intend-close-transaction-november-22-2023
2023-11-21 23:18:22 -05:00
Rin Kuryloski ac1e6cc1cb Add khepri dep to rabbitmq_consistent_hash_exchange in Make 2023-10-16 16:20:18 +02:00
Diana Parra Corbacho 5f0981c5a3
Allow to use Khepri database to store metadata instead of Mnesia
[Why]

Mnesia is a very powerful and convenient tool for Erlang applications:
it is a persistent disc-based database, it handles replication accross
multiple Erlang nodes and it is available out-of-the-box from the
Erlang/OTP distribution. RabbitMQ relies on Mnesia to manage all its
metadata:

* virtual hosts' properties
* intenal users
* queue, exchange and binding declarations (not queues data)
* runtime parameters and policies
* ...

Unfortunately Mnesia makes it difficult to handle network partition and,
as a consequence, the merge conflicts between Erlang nodes once the
network partition is resolved. RabbitMQ provides several partition
handling strategies but they are not bullet-proof. Users still hit
situations where it is a pain to repair a cluster following a network
partition.

[How]

@kjnilsson created Ra [1], a Raft consensus library that RabbitMQ
already uses successfully to implement quorum queues and streams for
instance. Those queues do not suffer from network partitions.

We created Khepri [2], a new persistent and replicated database engine
based on Ra and we want to use it in place of Mnesia in RabbitMQ to
solve the problems with network partitions.

This patch integrates Khepri as an experimental feature. When enabled,
RabbitMQ will store all its metadata in Khepri instead of Mnesia.

This change comes with behavior changes. While Khepri remains disabled,
you should see no changes to the behavior of RabbitMQ. If there are
changes, it is a bug. After Khepri is enabled, there are significant
changes of behavior that you should be aware of.

Because it is based on the Raft consensus algorithm, when there is a
network partition, only the cluster members that are in the partition
with at least `(Number of nodes in the cluster ÷ 2) + 1` number of nodes
can "make progress". In other words, only those nodes may write to the
Khepri database and read from the database and expect a consistent
result.

For instance in a cluster of 5 RabbitMQ nodes:
* If there are two partitions, one with 3 nodes, one with 2 nodes, only
  the group of 3 nodes will be able to write to the database.
* If there are three partitions, two with 2 nodes, one with 1 node, none
  of the group can write to the database.

Because the Khepri database will be used for all kind of metadata, it
means that RabbitMQ nodes that can't write to the database will be
unable to perform some operations. A list of operations and what to
expect is documented in the associated pull request and the RabbitMQ
website.

This requirement from Raft also affects the startup of RabbitMQ nodes in
a cluster. Indeed, at least a quorum number of nodes must be started at
once to allow nodes to become ready.

To enable Khepri, you need to enable the `khepri_db` feature flag:

    rabbitmqctl enable_feature_flag khepri_db

When the `khepri_db` feature flag is enabled, the migration code
performs the following two tasks:
1. It synchronizes the Khepri cluster membership from the Mnesia
   cluster. It uses `mnesia_to_khepri:sync_cluster_membership/1` from
   the `khepri_mnesia_migration` application [3].
2. It copies data from relevant Mnesia tables to Khepri, doing some
   conversion if necessary on the way. Again, it uses
   `mnesia_to_khepri:copy_tables/4` from `khepri_mnesia_migration` to do
   it.

This can be performed on a running standalone RabbitMQ node or cluster.
Data will be migrated from Mnesia to Khepri without any service
interruption. Note that during the migration, the performance may
decrease and the memory footprint may go up.

Because this feature flag is considered experimental, it is not enabled
by default even on a brand new RabbitMQ deployment.

More about the implementation details below:

In the past months, all accesses to Mnesia were isolated in a collection
of `rabbit_db*` modules. This is where the integration of Khepri mostly
takes place: we use a function called `rabbit_khepri:handle_fallback/1`
which selects the database and perform the query or the transaction.
Here is an example from `rabbit_db_vhost`:

* Up until RabbitMQ 3.12.x:

        get(VHostName) when is_binary(VHostName) ->
            get_in_mnesia(VHostName).

* Starting with RabbitMQ 3.13.0:

        get(VHostName) when is_binary(VHostName) ->
            rabbit_khepri:handle_fallback(
              #{mnesia => fun() -> get_in_mnesia(VHostName) end,
                khepri => fun() -> get_in_khepri(VHostName) end}).

This `rabbit_khepri:handle_fallback/1` function relies on two things:
1. the fact that the `khepri_db` feature flag is enabled, in which case
   it always executes the Khepri-based variant.
4. the ability or not to read and write to Mnesia tables otherwise.

Before the feature flag is enabled, or during the migration, the
function will try to execute the Mnesia-based variant. If it succeeds,
then it returns the result. If it fails because one or more Mnesia
tables can't be used, it restarts from scratch: it means the feature
flag is being enabled and depending on the outcome, either the
Mnesia-based variant will succeed (the feature flag couldn't be enabled)
or the feature flag will be marked as enabled and it will call the
Khepri-based variant. The meat of this function really lives in the
`khepri_mnesia_migration` application [3] and
`rabbit_khepri:handle_fallback/1` is a wrapper on top of it that knows
about the feature flag.

However, some calls to the database do not depend on the existence of
Mnesia tables, such as functions where we need to learn about the
members of a cluster. For those, we can't rely on exceptions from
Mnesia. Therefore, we just look at the state of the feature flag to
determine which database to use. There are two situations though:

* Sometimes, we need the feature flag state query to block because the
  function interested in it can't return a valid answer during the
  migration. Here is an example:

        case rabbit_khepri:is_enabled(RemoteNode) of
            true  -> can_join_using_khepri(RemoteNode);
            false -> can_join_using_mnesia(RemoteNode)
        end

* Sometimes, we need the feature flag state query to NOT block (for
  instance because it would cause a deadlock). Here is an example:

        case rabbit_khepri:get_feature_state() of
            enabled -> members_using_khepri();
            _       -> members_using_mnesia()
        end

Direct accesses to Mnesia still exists. They are limited to code that is
specific to Mnesia such as classic queue mirroring or network partitions
handling strategies.

Now, to discover the Mnesia tables to migrate and how to migrate them,
we use an Erlang module attribute called
`rabbit_mnesia_tables_to_khepri_db` which indicates a list of Mnesia
tables and an associated converter module. Here is an example in the
`rabbitmq_recent_history_exchange` plugin:

    -rabbit_mnesia_tables_to_khepri_db(
       [{?RH_TABLE, rabbit_db_rh_exchange_m2k_converter}]).

The converter module  — `rabbit_db_rh_exchange_m2k_converter` in this
example  — is is fact a "sub" converter module called but
`rabbit_db_m2k_converter`. See the documentation of a `mnesia_to_khepri`
converter module to learn more about these modules.

[1] https://github.com/rabbitmq/ra
[2] https://github.com/rabbitmq/khepri
[3] https://github.com/rabbitmq/khepri_mnesia_migration

See #7206.

Co-authored-by: Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com>
Co-authored-by: Diana Parra Corbacho <dparracorbac@vmware.com>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
2023-09-29 16:00:11 +02:00
Jean-Sébastien Pédron b05884c530
rabbit_db: Remove `rabbit_db:run/1`
[Why]
In the Khepri ingration branch, we use a function which switches between
Khepri and Mnesia. This function is specific to Khepri, in particular to
support the migration from Mnesia to Khepri. It is also temporary and
will go away once Mnesia code is dropped.

[How]
We go back to direct calls to the Mnesia-specific functions. The
Mnesia-specific functions stay as they are to make the diff with the
Khepri integration branch easy to read.
2023-09-04 21:09:10 +02:00
Karl Nilsson 119f034406
Message Containers (#5077)
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.

Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.

This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.

The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.

This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.


COMMIT HISTORY:

* Refactor away from using the delivery{} record

In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.

Add mc module and move direct replies outside of exchange

Lots of changes incl classic queues

Implement stream support incl amqp conversions

simplify mc state record

move mc.erl

mc dlx stuff

recent history exchange

Make tracking work

But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.

Tracing as a whole may want a bit of a re-vamp at some point.

tidy

make quorum queue peek work by legacy conversion

dead lettering fixes

dead lettering fixes

CMQ fixes

rabbit_trace type fixes

fixes

fix

Fix classic queue props

test assertion fix

feature flag and backwards compat

Enable message_container feature flag in some SUITEs

Dialyzer fixes

fixes

fix

test fixes

Various

Manually update a gazelle generated file

until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185

Add message_containers_SUITE to bazel

and regen bazel files with gazelle from rules_erlang@main

Simplify essential proprty access

Such as durable, ttl and priority by extracting them into annotations
at message container init time.

Move type

to remove dependenc on amqp10 stuff in mc.erl

mostly because I don't know how to make bazel do the right thing

add more stuff

Refine routing header stuff

wip

Cosmetics

Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.

* Dedup death queue names

* Fix function clause crashes

Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)

Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.

* Fix is_utf8_no_null crash

Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```

* Implement mqtt mc behaviour

For now via amqp translation.

This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```

* Shorten mc file names

Module name length matters because for each persistent message the #mc{}
record is persisted to disk.

```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```

This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```

* mc: make deaths an annotation + fixes

* Fix mc_mqtt protocol_state callback

* Fix test will_delay_node_restart

```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```

* Bazel run gazelle

* mix format rabbitmqctl.ex

* Ensure ttl annotation is refelected in amqp legacy protocol state

* Fix id access in message store

* Fix rabbit_message_interceptor_SUITE

* dializer fixes

* Fix rabbit:rabbit_message_interceptor_SUITE-mixed

set_annotation/3 should not result in duplicate keys

* Fix MQTT shared_SUITE-mixed

Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.

The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.

* Field content of 'v1_0.data' can be binary

Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
    --test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
    -t- --test_sharding_strategy=disabled
```

* Remove route/2 and implement route/3 for all exchange types.

This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.

stream filtering fixes

* Translate directly from MQTT to AMQP 0.9.1

* handle undecoded properties in mc_compat

amqpl: put clause in right order

recover death deatails from amqp data

* Replace callback init_amqp with convert_from

* Fix return value of lists:keyfind/3

* Translate directly from AMQP 0.9.1 to MQTT

* Fix MQTT payload size

MQTT payload can be a list when converted from AMQP 0.9.1 for example

First conversions tests

Plus some other conversion related fixes.

bazel

bazel

translate amqp 1.0 null to undefined

mc: property/2 and correlation_id/message_id return type tagged values.

To ensure we can support a variety of types better.

The type type tags are AMQP 1.0 flavoured.

fix death recovery

mc_mqtt: impl new api

Add callbacks to allow protocols to compact data before storage

And make readable if needing to query things repeatedly.

bazel fix

* more decoding

* tracking mixed versions compat

* mc: flip default of `durable` annotation to save some data.

Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.

* mc conversion tests and tidy up

* mc make x_header unstrict again

* amqpl: death record fixes

* bazel

* amqp -> amqpl conversion test

* Fix crash in mc_amqp:size/1

Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.

* Fix crash in lists:flatten/1

Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.

* Fix crash in rabbit_writer

Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
  initial call: rabbit_writer:enter_mainloop/2
  pid: <0.711.0>
  registered_name: []
  exception error: bad argument
    in function  size/1
       called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
       *** argument 1: not tuple or binary
    in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
    in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
    in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
    in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
    in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
    in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
    in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.

This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.

* Add accidentally deleted line back

* mc: optimise mc_amqp internal format

By removint the outer records for message and delivery annotations
as well as application properties and footers.

* mc: optimis mc_amqp map_add by using upsert

* mc: refactoring and bug fixes

* mc_SUITE routingheader assertions

* mc remove serialize/1 callback as only used by amqp

* mc_amqp: avoid returning a nested list from protocol_state

* test and bug fix

* move infer_type to mc_util

* mc fixes and additiona assertions

* Support headers exchange routing for MQTT messages

When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).

This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.

When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.

* Fix crash when sending from stream to amqpl

When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
  initial call: rabbit_channel:init/1
  pid: <0.818.0>
  registered_name: []
  exception exit: {{badmatch,undefined},
                   [{rabbit_channel,handle_deliver0,4,
                                    [{file,"rabbit_channel.erl"},
                                     {line,2728}]},
                    {lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
                    {rabbit_channel,handle_cast,2,
                                    [{file,"rabbit_channel.erl"},
                                     {line,728}]},
                    {gen_server2,handle_msg,2,
                                 [{file,"gen_server2.erl"},{line,1056}]},
                    {proc_lib,wake_up,3,
                              [{file,"proc_lib.erl"},{line,251}]}]}
```

This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.

* Support consistent hash exchange routing for MQTT 5.0

When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.

* Convert MQTT 5.0 User Property

* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations

* Make use of Annotations in mc_mqtt:protocol_state/2

mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.

* Enforce AMQP 0.9.1 field name length limit

The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.

* Fix type specs

Apply feedback from Michael Davis

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* Add mc_mqtt unit test suite

Implement mc_mqtt:x_header/2

* Translate indicator that payload is UTF-8 encoded

when converting between MQTT 5.0 and AMQP 1.0

* Translate single amqp-value section from AMQP 1.0 to MQTT

Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.

If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.

This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.

* Fix payload conversion

* Translate Response Topic between MQTT and AMQP

Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.

The Response Topic must be a UTF-8 encoded string.

This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/"     RK        Publish to amq.topic with routing key RK
"/exchange/"  X "/" RK  Publish to exchange X with routing key RK
```

By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.

When an operator modifies the mqtt.exchange, the 2nd target address is
used.

* Apply PR feedback

and fix formatting

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* tidy up

* Add MQTT message_containers test

* consistent hash exchange: avoid amqp legacy conversion

When hashing on a header value.

* Avoid converting to amqp legacy when using exchange federation

* Fix test flake

* test and dialyzer fixes

* dialyzer fix

* Add MQTT protocol interoperability tests

Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams

* Regenerate portions of deps/rabbit/app.bzl with gazelle

I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.

* mc: refactoring

* mc_amqpl: handle delivery annotations

Just in case they are included.

Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
2023-08-31 11:27:13 +01:00
Michael Klishin 55442aa914 Replace @rabbitmq.com addresses with rabbitmq-core@groups.vmware.com
Don't ask why we have to do it. Because reasons!
2023-06-20 15:40:13 +04:00
Rin Kuryloski a944439fba Replace globs in bazel with explicit lists of files
As this is preferred in rules_erlang 3.9.14
2023-04-25 17:29:12 +02:00
Rin Kuryloski 854d01d9a5 Restore the original -include_lib statements from before #6466
since this broke erlang_ls

requires rules_erlang 3.9.13
2023-04-20 12:40:45 +02:00
Rin Kuryloski 8de8f59d47 Use gazelle generated bazel files
Bazel build files are now maintained primarily with `bazel run
gazelle`. This will analyze and merge changes into the build files as
necessitated by certain code changes (e.g. the introduction of new
modules).

In some cases there hints to gazelle in the build files, such as `#
gazelle:erlang...` or `# keep` comments. xref checks on plugins that
depend on the cli are a good example.
2023-04-17 18:13:18 +02:00
Rin Kuryloski 8a7eee6a86 Ignore warnings when building plt files for dependencies
As we don't generally care if a dependency has warnings, only the
target
2023-04-17 10:09:24 +02:00
Alexey Lebedeff 949b53543d Fix all dependencies for the dialyzer
This is the latest commit in the series, it fixes (almost) all the
problems with missing and circular dependencies for typing.

The only 2 unsolved problems are:

- `lg` dependency for `rabbit` - the problem is that it's the only
  dependency that contains NIF. And there is no way to make dialyzer
  ignore it - looks like unknown check is not suppressable by dialyzer
  directives. In the future making `lg` a proper dependency can be a
  good thing anyway.

- some missing elixir function in `rabbitmq_cli` (CSV, JSON and
  logging related).

- `eetcd` dependency for `rabbitmq_peer_discovery_etcd` - this one
  uses sub-directories in `src/`, which confuses dialyzer (or our bazel
  machinery is not able to properly handle it). I've tried the latest
  rules_erlang which flattens directory for .beam files, but it wasn't
  enough for dialyzer - it wasn't able to find core erlang files. This
  is a niche plugin and an unusual dependency, so probably not worth
  investigating further.
2023-02-13 17:37:44 +01:00
Alexey Lebedeff c7da0da8b8 Cleanup dialyzer calls
- Use the same base .plt everywhere, so there is no need to list
standard apps everywhere
- Fix typespecs: some typos and the use of not-exported types
2023-02-06 17:05:30 +01:00
Diana Parra Corbacho 9cf10ed8a7 Unit test rabbit_db_* modules, spec and API updates 2023-02-02 15:01:42 +01:00
Diana Parra Corbacho f2443f6d10 Move mnesia queries from rabbit_misc to rabbit_mnesia 2023-01-31 10:23:16 +01:00
Diana Parra Corbacho 783996f53d Move consistent hash exchange Mnesia-specific code to rabbit_db_ch_exchange module 2023-01-31 10:23:16 +01:00
Rin Kuryloski b84e746ee9 Rework plt/dialyze for rabbitmqctl and plugins that depend on it
This allows us to stop ignorning undefined callback warnings

When mix compiles rabbitmqctl, it produces a 'consolidated' directory
alongside the 'ebin' dir. Some of the modules in consolidated are
intended to be used instead of those provided by elixir. We now handle
the conflicts properly in the bazel build.
2023-01-19 17:29:23 +01:00
Rin Kuryloski 5ef8923462 Avoid the need to pass package name to rabbitmq_integration_suite 2023-01-18 15:25:27 +01:00
Rin Kuryloski a317b30807 Use improved assert_suites2 macro from rules_erlang 3.9.0 2023-01-18 15:07:06 +01:00
Michael Klishin ec4f1dba7d
(c) year bump: 2022 => 2023 2023-01-01 23:17:36 -05:00
Luke Bakken 7fe159edef
Yolo-replace format strings
Replaces `~s` and `~p` with their unicode-friendly counterparts.

```
git ls-files *.erl | xargs sed -i.ORIG -e s/~s>/~ts/g -e s/~p>/~tp/g
```
2022-10-10 10:32:03 +04:00
Rin Kuryloski 575c5f9975 Remove all of the .travis.yml files
since we no longer use them
2022-08-16 09:46:31 +02:00
Michael Klishin 25f4425131 CHX: document the limitations introduced in #5121 2022-07-01 12:29:00 +04:00
David Ansari 5ebbb75c01 Fix log format in consistent hash exchange
Before this commit:
```
Consistent hashing exchange: removing binding from exchange '"exchange 'e1' in vhost '/'"' to destination '"queue 'qq2' in vhost '/'"' with routing key '2'
```

The single quotes do not make sense.
2022-06-30 13:19:11 +00:00
David Ansari ba22b3307f Add some function and type specs 2022-06-30 09:45:46 +00:00
David Ansari 878f369b7a Make adding bindings idempotent
First binding wins.
Duplicate bindings, i.e. bindings with the same source exchange and
same destination queue / exchange but possibly different routing key
(weight) are ignored from now on by the consistent hash exchange.

This applies only to bindings being added.
For bindings being deleted, any duplicate binding (independent of its
routing key) will delete all buckets for the given source and
destination. (This is to ensure that buckets for a given source and
destination can be deleted for when upgrading from a version prior
to this commit. This was also the behaviour prior to this commit,
so nothing changes in that regard.)

Note that duplicate bindings continue to be created in RabbitMQ.
(They are only ignored by the consistent hash exchange.)

Adding a binding will perform linear search in the bucket map.
This is already stated in the README:
"These two operations use linear algorithms to update the ring."

The linear search when adding a binding could be optimised by
adding another Mnesia table field which will require a new migration and
feature flag. Hence, such an optimization is left out in this commit.

Fixes #3386.
2022-06-30 09:24:02 +00:00
Philip Kuryloski a250a533a4 Remove elixir related -ignore_xref calls
As they are no longer necessary with xref2 and the erlang.mk updates
2022-06-09 23:18:40 +02:00