# What? * Support Direct Reply-To for AMQP 1.0 * Compared to AMQP 0.9.1, this PR allows for multiple volatile queues on a single AMQP 1.0 session. Use case: JMS clients can create multiple temporary queues on the same JMS/AMQP session: * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/session#createTemporaryQueue() * https://jakarta.ee/specifications/messaging/3.1/apidocs/jakarta.messaging/jakarta/jms/jmscontext#createTemporaryQueue() * Fix missing metrics in for Direct Reply-To in AMQP 0.9.1, e.g. `messages_delivered_total` * Fix missing metrics (even without using Direct Reply-To ) in AMQP 0.9.1: If stats level is not `fine`, global metrics `rabbitmq_global_messages_delivered_*` should still be incremented. # Why? * Allow for scalable at-most-once RPC reply delivery Example use case: thousands of requesters connect, send a single request, wait for a single reply, and disconnect. This PR won't create any queue and won't write to the metadata store. Therefore, there's less pressure on the metadata store, less pressure on the Management API when listing all queues, less pressure on the metrics subsystem, etc. * Feature parity with AMQP 0.9.1 # How? This PR extracts the previously channel specific Direct Reply-To code into a new queue type: `rabbit_volatile_queue`. "Volatile" describes the semantics, not a use-case. It signals non-durable, zero-buffer, at-most-once, may-drop, and "not stored in Khepri." This new queue type is then used for AMQP 1.0 and AMQP 0.9.1. Sending to the volatile queue is stateless like previously with Direct Reply-To in AMQP 0.9.1 and like done for the MQTT QoS 0 queue. This allows for use cases where a single responder replies to e.g. 100k different requesters. RabbitMQ will automatically auto grant new link-credit to the responder because the new queue type confirms immediately. The key gets implicitly checked by the channel/session: If the queue name (including the key) doesn’t exist, the `handle_event` callback for this queue isn’t invoked and therefore no delivery will be sent to the responder. This commit supports Direct Reply-To across AMQP 1.0 and 0.9.1. In other words, the requester can be an AMQP 1.0 client while the responder is an AMQP 0.9.1 client or vice versa. RabbitMQ will internally convert between AMQP 0.9.1 `reply_to` and AMQP 1.0 `/queues/<queue>` address. The AMQP 0.9.1 `reply_to` property is expected to contain a queue name. That's in line with the AMQP 0.9.1 spec: > One of the standard message properties is Reply-To, which is designed specifically for carrying the name of reply queues. Compared to AMQP 0.9.1 where the requester sets the `reply_to` property to `amq.rabbitmq.reply-to` and RabbitMQ modifies this field when forwarding the message to the request queue, in AMQP 1.0 the requester learns about the queue name from the broker at link attachment time. The requester has to set the reply-to property to the server generated queue name. That's because the server isn't allowed to modify the bare message. During link attachment time, the client has to set certain fields. These fields are expected to be set by the RabbitMQ client libraries. Here is an Erlang example: ```erl Source = #{address => undefined, durable => none, expiry_policy => <<"link-detach">>, dynamic => true, capabilities => [<<"rabbitmq:volatile-queue">>]}, AttachArgs = #{name => <<"receiver">>, role => {receiver, Source, self()}, snd_settle_mode => settled, rcv_settle_mode => first}, {ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs), AddressReplyQ = receive {amqp10_event, {link, Receiver, {attached, Attach}}} -> #'v1_0.attach'{source = #'v1_0.source'{address = {utf8, Addr}}} = Attach, Addr end, ``` The client then sends the message by setting the reply-to address as follows: ```erl amqp10_client:send_msg( SenderRequester, amqp10_msg:set_properties( #{message_id => <<"my ID">>, reply_to => AddressReplyQ}, amqp10_msg:new(<<"tag">>, <<"request">>))), ``` If the responder attaches to the queue target in the reply-to field, RabbitMQ will check if the requester link is still attached. If the requester detached, the link will be refused. The responder can also attach to the anonymous null target and set the `to` field to the `reply-to` address. If RabbitMQ cannot deliver a reply, instead of buffering the reply, RabbitMQ will be drop the reply and increment the following Prometheus metric: ``` rabbitmq_global_messages_dead_lettered_maxlen_total{queue_type="rabbit_volatile_queue",dead_letter_strategy="disabled"} 0.0 ``` That's in line with the MQTT QoS 0 queue type. A reply message could be dropped for a variety of reasons: 1. The requester ran out of link-credit. It's therefore the requester's responsibility to grant sufficient link-credit on its receiving link. 2. RabbitMQ isn't allowed to deliver any message to due session flow control. It's the requster's responsibility to keep the session window large enough. 3. The requester doesn't consume messages fast enough causing TCP backpressure being applied or the RabbitMQ AMQP writer proc isn't scheduled quickly enough. The latter can happen for example if RabbitMQ runs with a single scheduler (is assigned a single CPU core). In either case, RabbitMQ internal flow control causes the volatile queue to drop messages. Therefore, if high throughput is required while message loss is undesirable, a classic queue should be used instead of a volatile queue since the former buffers messages while the latter doesn't. The main difference between the volatile queue and the MQTT QoS 0 queue is that the former isn't written to the metadata store. # Breaking Change Prior to this PR the following [documented caveat](https://www.rabbitmq.com/docs/4.0/direct-reply-to#limitations) applied: > If the RPC server publishes with the mandatory flag set then `amq.rabbitmq.reply-to.*` is treated as **not** a queue; i.e. if the server only publishes to this name then the message will be considered "not routed"; a `basic.return` will be sent if the mandatory flag was set. This PR removes this caveat. This PR introduces the following new behaviour: > If the RPC server publishes with the mandatory flag set, then `amq.rabbitmq.reply-to.*` is treated as a queue (assuming this queue name is encoded correctly). However, whether the requester is still there to consume the reply is not checked at routing time. In other words, if the RPC server only publishes to this name, then the message will be considered "routed" and RabbitMQ will therefore not send a `basic.return`. |
||
|---|---|---|
| .. | ||
| include | ||
| src | ||
| test | ||
| .gitignore | ||
| CODE_OF_CONDUCT.md | ||
| CONTRIBUTING.md | ||
| LICENSE | ||
| LICENSE-MPL-RabbitMQ | ||
| Makefile | ||
| README.md | ||
| elvis.config | ||
README.md
Erlang AMQP 1.0 client
This is an Erlang client for the AMQP 1.0 protocol.
Its primary purpose is to be used in RabbitMQ related projects but it is a generic client that was tested with at least 3 implementations of AMQP 1.0.
If you are looking for an Erlang client for AMQP 0-9-1 — a completely different protocol despite the name — consider this one.
Project Maturity and Status
This client is used in the cross-protocol version of the RabbitMQ Shovel plugin. It is not 100% feature complete but moderately mature and was tested against at least 3 AMQP 1.0 servers: RabbitMQ, Azure ServiceBus, ActiveMQ.
This client library is not officially supported by VMware at this time.
Usage
Connection Settings
The connection_config map contains various configuration properties.
-type address :: inet:socket_address() | inet:hostname().
-type connection_config() ::
#{container_id => binary(), % mandatory
%% must provide a list of addresses or a single address
addresses => [address()],
address => address(),
%% defaults to 5672, mandatory for TLS
port => inet:port_number(),
% the dns name of the target host
% required by some vendors such as Azure ServiceBus
hostname => binary(),
tls_opts => {secure_port, [ssl:tls_option()]}, % optional
notify => pid(), % Pid to receive protocol notifications. Set to self() if not provided
max_frame_size => non_neg_integer(), % incoming max frame size
idle_time_out => non_neg_integer(), % heartbeat
sasl => none | anon | {plain, User :: binary(), Password :: binary(),
% set this to a negative value to allow a sender to "overshoot" the flow
% control by this margin
transfer_limit_margin => 0 | neg_integer()}
}.
TLS
TLS is enabled by setting the tls_opts connection configuration property.
Currently the only valid value is {secure_port, [tls_option]} where the port
specified only accepts TLS. It is possible that tls negotiation as described
in the amqp 1.0 protocol will be supported in the future. If no value is provided
for tls_opt then a plain socket will be used.
Basic Example
%% this will connect to a localhost node
{ok, Hostname} = inet:gethostname(),
User = <<"guest">>,
Password = <<"guest">>,
%% create a configuration map
OpnConf = #{address => Hostname,
port => Port,
container_id => <<"test-container">>,
sasl => {plain, User, Password}},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(Session, SenderLinkName, <<"a-queue-maybe">>),
%% wait for credit to be received
receive
{amqp10_event, {link, Sender, credited}} -> ok
after 2000 ->
exit(credited_timeout)
end.
%% Create a new message using a delivery-tag, body and indicate
%% its settlement status (true meaning no disposition confirmation
%% will be sent by the receiver).
OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
ok = amqp10_client:send_msg(Sender, OutMsg),
ok = amqp10_client:detach_link(Sender),
%% create a receiver link
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, <<"a-queue-maybe">>),
%% grant some credit to the remote sender but don't auto-renew it
ok = amqp10_client:flow_link_credit(Receiver, 5, never),
%% wait for a delivery
receive
{amqp10_msg, Receiver, InMsg} -> ok
after 2000 ->
exit(delivery_timeout)
end.
ok = amqp10_client:close_connection(Connection),
Events
The ampq10_client API is mostly asynchronous with respect to the AMQP 1.0
protocol. Functions such as amqp10_client:open_connection typically return
after the Open frame has been successfully written to the socket rather than
waiting until the remote end returns with their Open frame. The client will
notify the caller of various internal/async events using amqp10_event
messages. In the example above when the remote replies with their Open frame
a message is sent of the following form:
{amqp10_event, {connection, ConnectionPid, opened}}
When the connection is closed an event is issued as such:
{amqp10_event, {connection, ConnectionPid, {closed, Why}}}
Why could be normal or contain a description of an error that occured
and resulted in the closure of the connection.
Likewise sessions and links have similar events using a similar format.
%% success events
{amqp10_event, {connection, ConnectionPid, opened}}
{amqp10_event, {session, SessionPid, begun}}
{amqp10_event, {link, LinkRef, attached}}
%% error events
{amqp10_event, {connection, ConnectionPid, {closed, Why}}}
{amqp10_event, {session, SessionPid, {ended, Why}}}
{amqp10_event, {link, LinkRef, {detached, Why}}}
In addition the client may notify the initiator of certain protocol events such as a receiver running out of credit or credit being available to a sender.
%% no more credit available to sender
{amqp10_event, {link, Sender, credit_exhausted}}
%% sender credit received
{amqp10_event, {link, Sender, credited}}
Other events may be declared as necessary, Hence it makes sense for a user
of the client to handle all {amqp10_event, _} events to ensure unexpected
messages aren't kept around in the mailbox.