* MQTT: avoid an exception
when an AMQP 0-9-1 publisher publishes a message
that has expiration set.
Stack trace was contributed in #12707 by @rdsilio.
* mc_mqtt_SUITE test for #12707#12710
* MQTT protocol_interop_SUITE: new test for #12710#12707
* Simplify tests
---------
Co-authored-by: David Ansari <david.ansari@gmx.de>
Support x-cc message annotation
Support an `x-cc` message annotation in AMQP 1.0
similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1.
The value of the `x-cc` message annotation must by a list of strings.
A message annotation is used since application properties allow only simple types.
This commit is a follow up of https://github.com/rabbitmq/rabbitmq-server/pull/11604
This commit changes the AMQP address format v2 from
```
/e/:exchange/:routing-key
/e/:exchange
/q/:queue
```
to
```
/exchanges/:exchange/:routing-key
/exchanges/:exchange
/queues/:queue
```
Advantages:
1. more user friendly
2. matches nicely with the plural forms of HTTP API v1 and HTTP API v2
This plural form is still non-overlapping with AMQP address format v1.
Although it might feel unusual at first to send a message to `/queues/q1`,
if you think about `queues` just being a namespace or entity type, this
address format makes sense.
to distinguish between v1 and v2 address formats.
Previously, v1 and v2 address formats overlapped and behaved differently
for example for:
```
/queue/:queue
/exchange/:exchange
```
This PR changes the v2 format to:
```
/e/:exchange/:routing-key
/e/:exchange
/q/:queue
```
to distinguish between v1 and v2 addresses.
This allows to call `rabbit_deprecated_features:is_permitted(amqp_address_v1)`
only if we know that the user requests address format v1.
Note that `rabbit_deprecated_features:is_permitted/1` should only
be called when the old feature is actually used.
Use percent encoding / decoding for address URI format v2.
This allows to use any UTF-8 encoded characters including slashes (`/`)
in routing keys, exchange names, and queue names and is more future
safe.
Prior to this commit the entire amqp-value or amqp-sequence sections
were parsed when converting a message from mc_amqp.
Parsing the entire amqp-value or amqp-sequence section can generate a
huge amount of garbage depending on how large these sections are.
Given that other protocol cannot make use of amqp-value and
amqp-sequence sections anyway, leave them AMQP encoded when converting
from mc_amqp.
In fact prior to this commit, the entire body section was parsed
generating huge amounts of garbage just to subsequently encode it again
in mc_amqpl or mc_mqtt.
The new conversion interface from mc_amqp to other mc_* modules will
either output amqp-data sections or the encoded amqp-value /
amqp-sequence sections.
This commit fixes test
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed -t- \
--test_sharding_strategy=disabled --test_env \
FOCUS="-group [mqtt,v3,cluster_size_3] -case pubsub"
```
Fix some mixed version tests
Assume the AMQP body, especially amqp-value section won't be parsed.
Hence, omit smart conversions from AMQP to MQTT involving the
Payload-Format-Indicator bit.
Fix test
Fix
```
bazel test //deps/amqp10_client:system_SUITE-mixed -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [rabbitmq]
```
By default, when the 'durable' message container (mc) annotation is unset,
messages are interpreted to be durable.
Prior to this commit, MQTT messages that were sent with QoS 0 were
stored durably in classic queues.
This commit takes the same approach for mc_mqtt as for mc_amqpl and mc_amqp:
If the message is durable, the durable mc annotation will not be set.
If the message is non-durable, the durable mc annotation will be set to false.
## What?
Introduce a new address format (let's call it v2) for AMQP 1.0 source and target addresses.
The old format (let's call it v1) is described in
https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing
The only v2 source address format is:
```
/queue/:queue
```
The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```
## Why?
The AMQP address v1 format comes with the following flaws:
1. Obscure address format:
Without reading the documentation, the differences for example between source addresses
```
/amq/queue/:queue
/queue/:queue
:queue
```
are unknown to users. Hence, the address format is obscure.
2. Implicit creation of topologies
Some address formats implicitly create queues (and bindings), such as source address
```
/exchange/:exchange/:binding-key
```
or target address
```
/queue/:queue
```
These queues and bindings are never deleted (by the AMQP 1.0 plugin.)
Implicit creation of such topologies is also obscure.
3. Redundant address formats
```
/queue/:queue
:queue
```
have the same meaning and are therefore redundant.
4. Properties section must be parsed to determine whether a routing key is present
Target address
```
/exchange/:exchange
```
requires RabbitMQ to parse the properties section in order to check whether the message `subject` is set.
If `subject` is not set, the routing key will default to the empty string.
5. Using `subject` as routing key misuses the purpose of this field.
According to the AMQP spec, the message `subject` field's purpose is:
> A common field for summary information about the message content and purpose.
6. Exchange names, queue names and routing keys must not contain the "/" (slash) character.
The current 3.13 implemenation splits by "/" disallowing these
characters in exchange, and queue names, and routing keys which is
unnecessary prohibitive.
7. Clients must create a separate link per target exchange
While this is reasonable working assumption, there might be rare use
cases where it could make sense to create many exchanges (e.g. 1
exchange per queue, see
https://github.com/rabbitmq/rabbitmq-server/discussions/10708) and have
a single application publish to all these exchanges.
With the v1 address format, for an application to send to 500 different
exchanges, it needs to create 500 links.
Due to these disadvantages and thanks to #10559 which allows clients to explicitly create topologies,
we can create a simpler, clearer, and better v2 address format.
## How?
### Design goals
Following the 7 cons from v1, the design goals for v2 are:
1. The address format should be simple so that users have a chance to
understand the meaning of the address without necessarily consulting the docs.
2. The address format should not implicitly create queues, bindings, or exchanges.
Instead, topologies should be created either explicitly via the new management node
prior to link attachment (see #10559), or in future, we might support the `dynamic`
source or target properties so that RabbitMQ creates queues dynamically.
3. No redundant address formats.
4. The target address format should explicitly state whether the routing key is present, empty,
or will be provided dynamically in each message.
5. `Subject` should not be used as routing key. Instead, a better
fitting field should be used.
6. Exchange names, queue names, and routing keys should allow to contain
valid UTF-8 encoded data including the "/" character.
7. Allow both target exchange and routing key to by dynamically provided within each message.
Furthermore
8. v2 must co-exist with v1 for at least some time. Applications should be able to upgrade to
RabbitMQ 4.0 while continuing to use v1. Examples include AMQP 1.0 shovels and plugins communicating
between a 4.0 and a 3.13 cluster. Starting with 4.1, we should change the AMQP 1.0 shovel and plugin clients
to use only the new v2 address format. This will allow AMQP 1.0 and plugins to communicate between a 4.1 and 4.2 cluster.
We will deprecate v1 in 4.0 and remove support for v1 in a later 4.x version.
### Additional Context
The address is usually a String, but can be of any type.
The [AMQP Addressing extension](https://docs.oasis-open.org/amqp/addressing/v1.0/addressing-v1.0.html)
suggests that addresses are URIs and are therefore hierarchical and could even contain query parameters:
> An AMQP address is a URI reference as defined by RFC3986.
> the path expression is a sequence of identifier segments that reflects a path through an
> implementation specific relationship graph of AMQP nodes and their termini.
> The path expression MUST resolve to a node’s terminus in an AMQP container.
The [Using the AMQP Anonymous Terminus for Message Routing Version 1.0](https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html)
extension allows for the target being `null` and the `To` property to contain the node address.
This corresponds to AMQP 0.9.1 where clients can send each message on the same channel to a different `{exchange, routing-key}` destination.
The following v2 address formats will be used.
### v2 addresses
A new deprecated feature flag `amqp_address_v1` will be introduced in 4.0 which is permitted by default.
Starting with 4.1, we should change the AMQP 1.0 shovel and plugin AMQP 1.0 clients to use only the new v2 address format.
However, 4.1 server code must still understand the 4.0 AMQP 1.0 shovel and plugin AMQP 1.0 clients’ v1 address format.
The new deprecated feature flag will therefore be denied by default in 4.2.
This allows AMQP 1.0 shovels and plugins to work between
* 4.0 and 3.13 clusters using v1
* 4.1 and 4.0 clusters using v2 from 4.1 to v4.0 and v1 from 4.0 to 4.1
* 4.2 and 4.1 clusters using v2
without having to support both v1 and v2 at the same time in the AMQP 1.0 shovel and plugin clients.
While supporting both v1 and v2 in these clients is feasible, it's simpler to switch the client code directly from v1 to v2.
### v2 source addresses
The source address format is
```
/queue/:queue
```
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.
### v2 target addresses
v1 requires attaching a new link for each destination exchange.
v2 will allow dynamic `{exchange, routing-key}` combinations for a given link.
v2 therefore allows for the rare use cases where a single AMQP 1.0 publisher app needs to send to many different exchanges.
Setting up a link per destination exchange could be cumbersome.
Hence, v2 will support the dynamic `{exchange, routing-key}` combinations of AMQP 0.9.1.
To achieve this, we make use of the "Anonymous Terminus for Message Routing" extension:
The target address will contain the AMQP value null.
The `To` field in each message must be set and contain either address format
```
/exchange/:exchange/key/:routing-key
```
or
```
/exchange/:exchange
```
when using the empty routing key.
The `to` field requires an address type and is better suited than the `subject field.
Note that each message will contain this `To` value for the anonymous terminus.
Hence, we should save some bytes being sent across the network and stored on disk.
Using a format
```
/e/:exchange/k/:routing-key
```
saves more bytes, but is too obscure.
However, we use only `/key/` instead of `/routing-key/` so save a few bytes.
This also simplifies the format because users don’t have to remember whether to use spell `routing-key` or `routing_key` or `routingkey`.
The other allowed target address formats are:
```
/exchange/:exchange/key/:routing-key
```
where exchange and routing key are static on the given link.
```
/exchange/:exchange
```
where exchange and routing key are static on the given link, and routing key will be the empty string (useful for example for the fanout exchange).
```
/queue/:queue
```
This provides RabbitMQ beginners the illusion of sending a message directly
to a queue without having to understand what exchanges and routing keys are.
If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created.
If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist.
Besides the additional queue existence check, this queue target is different from
```
/exchange//key/:queue
```
in that queue specific optimisations might be done (in future) by RabbitMQ
(for example different receiving queue types could grant different amounts of link credits to the sending clients).
A write permission check to the amq.default exchange will be performed nevertheless.
v2 will prohibit the v1 static link & dynamic routing-key combination
where the routing key is sent in the message `subject` as that’s also obscure.
For this use case, v2’s new anonymous terminus can be used where both exchange and routing key are defined in the message’s `To` field.
(The bare message must not be modified because it could be signed.)
The alias format
```
/topic/:topic
```
will also be removed.
Sending to topic exchanges is arguably an advanced feature.
Users can directly use the format
```
/exchange/amq.topic/key/:topic
```
which reduces the number of redundant address formats.
### v2 address format reference
To sump up (and as stated at the top of this commit message):
The only v2 source address format is:
```
/queue/:queue
```
The 4 possible v2 target addresses formats are:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
<null>
```
where the last AMQP <null> value format requires that each message’s `to` field contains one of:
```
/exchange/:exchange/key/:routing-key
/exchange/:exchange
/queue/:queue
```
Hence, all 8 listed design goals are reached.
An AMQP boolean can by encoded using 1 byte or 2 bytes:
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-boolean
Prior to this commit, our Erlang parser returned:
* Erlang terms `true` or `false` for the 1 byte AMQP encoding
* Erlang terms `{boolean, true}` or `{boolean, false}` for the 2 byte AMQP enconding
Having a serializer and parser that perform the opposite actions such
that
```
Term = parse(serialize(Term))
```
is desirable as it provides a symmetric property useful not only for
property based testing, but also for avoiding altering message hashes
when serializing and parsing the same term.
However, dealing wth `{boolean, boolean()}` tuples instead of `boolean()` is very unhandy since
all Erlang code must take care of both forms leading to subtle bugs as
occurred in:
* 4cbeab8974/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl (L155-L158)
* b8173c9d3b/deps/rabbitmq_mqtt/src/mc_mqtt.erl (L83-L88)
* b8173c9d3b/deps/rabbit/src/mc_amqpl.erl (L123-L127)
Therefore, this commits decides to take the safe approach and always
parse to an Erlang `boolean()` independent of whether the AMQP boolean
was encoded with 1 or 2 bytes.
* Reduce per message disk overhead
Message container annotation keys are stored on disk.
By shortening them we save 95 - 58 = 37 bytes per message.
```
1> byte_size(term_to_binary(#{exchange => <<>>, routing_keys => [<<"my-key">>], durable => true, priority => 3, timestamp => 1000})).
95
2> byte_size(term_to_binary(#{x => <<>>, rk => [<<"my-key">>], d => true, p => 3, ts => 1000})).
58
```
This should somewhat reduce disk I/O and disk space.
* Ensure durable is a boolean
Prevent key 'durable' with value 'undefined' being added to the
mc annotations, for example when the durable field was not set, but
another AMQP 1.0 header field was set.
* Apply feedback
To refine conversion behaviour add additional tests
and ensure it matches the documentation.
mc: optionally capture source environment
And pass target environment to mc:convert
This allows environmental data and configuration to be captured and
used to modify and complete conversion logic whilst allowing conversion
code to remain pure and portable.
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.
Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.
This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.
The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.
This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.
COMMIT HISTORY:
* Refactor away from using the delivery{} record
In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.
Add mc module and move direct replies outside of exchange
Lots of changes incl classic queues
Implement stream support incl amqp conversions
simplify mc state record
move mc.erl
mc dlx stuff
recent history exchange
Make tracking work
But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.
Tracing as a whole may want a bit of a re-vamp at some point.
tidy
make quorum queue peek work by legacy conversion
dead lettering fixes
dead lettering fixes
CMQ fixes
rabbit_trace type fixes
fixes
fix
Fix classic queue props
test assertion fix
feature flag and backwards compat
Enable message_container feature flag in some SUITEs
Dialyzer fixes
fixes
fix
test fixes
Various
Manually update a gazelle generated file
until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185
Add message_containers_SUITE to bazel
and regen bazel files with gazelle from rules_erlang@main
Simplify essential proprty access
Such as durable, ttl and priority by extracting them into annotations
at message container init time.
Move type
to remove dependenc on amqp10 stuff in mc.erl
mostly because I don't know how to make bazel do the right thing
add more stuff
Refine routing header stuff
wip
Cosmetics
Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.
* Dedup death queue names
* Fix function clause crashes
Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)
Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.
* Fix is_utf8_no_null crash
Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```
* Implement mqtt mc behaviour
For now via amqp translation.
This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```
* Shorten mc file names
Module name length matters because for each persistent message the #mc{}
record is persisted to disk.
```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```
This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```
* mc: make deaths an annotation + fixes
* Fix mc_mqtt protocol_state callback
* Fix test will_delay_node_restart
```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```
* Bazel run gazelle
* mix format rabbitmqctl.ex
* Ensure ttl annotation is refelected in amqp legacy protocol state
* Fix id access in message store
* Fix rabbit_message_interceptor_SUITE
* dializer fixes
* Fix rabbit:rabbit_message_interceptor_SUITE-mixed
set_annotation/3 should not result in duplicate keys
* Fix MQTT shared_SUITE-mixed
Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.
The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.
* Field content of 'v1_0.data' can be binary
Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
--test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
-t- --test_sharding_strategy=disabled
```
* Remove route/2 and implement route/3 for all exchange types.
This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.
stream filtering fixes
* Translate directly from MQTT to AMQP 0.9.1
* handle undecoded properties in mc_compat
amqpl: put clause in right order
recover death deatails from amqp data
* Replace callback init_amqp with convert_from
* Fix return value of lists:keyfind/3
* Translate directly from AMQP 0.9.1 to MQTT
* Fix MQTT payload size
MQTT payload can be a list when converted from AMQP 0.9.1 for example
First conversions tests
Plus some other conversion related fixes.
bazel
bazel
translate amqp 1.0 null to undefined
mc: property/2 and correlation_id/message_id return type tagged values.
To ensure we can support a variety of types better.
The type type tags are AMQP 1.0 flavoured.
fix death recovery
mc_mqtt: impl new api
Add callbacks to allow protocols to compact data before storage
And make readable if needing to query things repeatedly.
bazel fix
* more decoding
* tracking mixed versions compat
* mc: flip default of `durable` annotation to save some data.
Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.
* mc conversion tests and tidy up
* mc make x_header unstrict again
* amqpl: death record fixes
* bazel
* amqp -> amqpl conversion test
* Fix crash in mc_amqp:size/1
Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.
* Fix crash in lists:flatten/1
Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.
* Fix crash in rabbit_writer
Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
initial call: rabbit_writer:enter_mainloop/2
pid: <0.711.0>
registered_name: []
exception error: bad argument
in function size/1
called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
*** argument 1: not tuple or binary
in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.
This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.
* Add accidentally deleted line back
* mc: optimise mc_amqp internal format
By removint the outer records for message and delivery annotations
as well as application properties and footers.
* mc: optimis mc_amqp map_add by using upsert
* mc: refactoring and bug fixes
* mc_SUITE routingheader assertions
* mc remove serialize/1 callback as only used by amqp
* mc_amqp: avoid returning a nested list from protocol_state
* test and bug fix
* move infer_type to mc_util
* mc fixes and additiona assertions
* Support headers exchange routing for MQTT messages
When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).
This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.
When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.
* Fix crash when sending from stream to amqpl
When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
initial call: rabbit_channel:init/1
pid: <0.818.0>
registered_name: []
exception exit: {{badmatch,undefined},
[{rabbit_channel,handle_deliver0,4,
[{file,"rabbit_channel.erl"},
{line,2728}]},
{lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
{rabbit_channel,handle_cast,2,
[{file,"rabbit_channel.erl"},
{line,728}]},
{gen_server2,handle_msg,2,
[{file,"gen_server2.erl"},{line,1056}]},
{proc_lib,wake_up,3,
[{file,"proc_lib.erl"},{line,251}]}]}
```
This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.
* Support consistent hash exchange routing for MQTT 5.0
When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.
* Convert MQTT 5.0 User Property
* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations
* Make use of Annotations in mc_mqtt:protocol_state/2
mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.
* Enforce AMQP 0.9.1 field name length limit
The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.
* Fix type specs
Apply feedback from Michael Davis
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* Add mc_mqtt unit test suite
Implement mc_mqtt:x_header/2
* Translate indicator that payload is UTF-8 encoded
when converting between MQTT 5.0 and AMQP 1.0
* Translate single amqp-value section from AMQP 1.0 to MQTT
Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.
If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.
This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.
* Fix payload conversion
* Translate Response Topic between MQTT and AMQP
Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.
The Response Topic must be a UTF-8 encoded string.
This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/" RK Publish to amq.topic with routing key RK
"/exchange/" X "/" RK Publish to exchange X with routing key RK
```
By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.
When an operator modifies the mqtt.exchange, the 2nd target address is
used.
* Apply PR feedback
and fix formatting
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
* tidy up
* Add MQTT message_containers test
* consistent hash exchange: avoid amqp legacy conversion
When hashing on a header value.
* Avoid converting to amqp legacy when using exchange federation
* Fix test flake
* test and dialyzer fixes
* dialyzer fix
* Add MQTT protocol interoperability tests
Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams
* Regenerate portions of deps/rabbit/app.bzl with gazelle
I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.
* mc: refactoring
* mc_amqpl: handle delivery annotations
Just in case they are included.
Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.
---------
Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>