The AMQP spec defines:
```
<field name="durable" type="boolean" default="false"/>
```
RabbitMQ 4.0 and 4.1 interpret the durable field as true if not set.
The idea was to favour safety over performance.
This complies with the AMQP spec because the spec allows other target or
node specific defaults for the durable field:
> If the header section is omitted the receiver MUST assume the appropriate
> default values (or the meaning implied by no value being set) for the fields
> within the header unless other target or node specific defaults have otherwise
> been set.
However, some client libraries completely omit the header section if the
app expliclity sets durable=false. This complies with the spec, but it
means that RabbitMQ cannot diffentiate between "client app forgot to set
the durable field" vs "client lib opted in for an optimisation omitting
the header section".
This is problematic with JMS message selectors where JMS apps can filter
on JMSDeliveryMode. To be able to correctly filter on JMSDeliveryMode,
RabbitMQ needs to know whether the JMS app sent the message as
PERSISTENT or NON_PERSISTENT.
Rather than relying on client libs to always send the header section
including the durable field, this commit makes RabbitMQ comply with the
default value for durable in the AMQP spec.
Some client lib maintainers accepted to send the header section, while
other maintainers refused to do so:
https://github.com/Azure/go-amqp/issues/330
https://issues.apache.org/jira/browse/QPIDJMS-608
Likely the AMQP spec was designed to omit the header section when
performance is important, as is the case with durable=false. Omitting
the header section means saving a few bytes per message on the wire and
some marshalling and unmarshalling overhead on both client and server.
Therefore, it's better to push the "safe by default" behaviour from the broker
back to the client libs. Client libs should send messages as durable by
default unless the client app expliclity opts in to send messages as
non-durable. This is also what JMS does: By default JMS apps send
messages as PERSISTENT:
> The message producer's default delivery mode is PERSISTENT.
Therefore, this commit also makes the AMQP Erlang client send messages as
durable, by default.
This commit will apply to RabbitMQ 4.2.
It's arguably not a breaking change because in RabbitMQ, message durability
is actually more determined by the queue type the message is sent to rather than the
durable field of the message:
* Quroum queues and streams store messages durably (fsync or replicate)
no matter what the durable field is
* MQTT QoS 0 queues hold messages in memory no matter what the
durable field is
* Classic queues do not fsync even if the durable field is set to true
In addition, the RabbitMQ AMQP Java library introduced in RabbitMQ 4.0 sends messages with
durable=true:
|
||
---|---|---|
.. | ||
src | ||
test | ||
.gitignore | ||
CODE_OF_CONDUCT.md | ||
CONTRIBUTING.md | ||
LICENSE | ||
LICENSE-MPL-RabbitMQ | ||
Makefile | ||
README.md | ||
elvis.config | ||
erlang.mk | ||
rabbitmq-components.mk |
README.md
Erlang AMQP 1.0 client
This is an Erlang client for the AMQP 1.0 protocol.
Its primary purpose is to be used in RabbitMQ related projects but it is a generic client that was tested with at least 3 implementations of AMQP 1.0.
If you are looking for an Erlang client for AMQP 0-9-1 — a completely different protocol despite the name — consider this one.
Project Maturity and Status
This client is used in the cross-protocol version of the RabbitMQ Shovel plugin. It is not 100% feature complete but moderately mature and was tested against at least 3 AMQP 1.0 servers: RabbitMQ, Azure ServiceBus, ActiveMQ.
This client library is not officially supported by VMware at this time.
Usage
Connection Settings
The connection_config
map contains various configuration properties.
-type address :: inet:socket_address() | inet:hostname().
-type connection_config() ::
#{container_id => binary(), % mandatory
%% must provide a list of addresses or a single address
addresses => [address()],
address => address(),
%% defaults to 5672, mandatory for TLS
port => inet:port_number(),
% the dns name of the target host
% required by some vendors such as Azure ServiceBus
hostname => binary(),
tls_opts => {secure_port, [ssl:tls_option()]}, % optional
notify => pid(), % Pid to receive protocol notifications. Set to self() if not provided
max_frame_size => non_neg_integer(), % incoming max frame size
idle_time_out => non_neg_integer(), % heartbeat
sasl => none | anon | {plain, User :: binary(), Password :: binary(),
% set this to a negative value to allow a sender to "overshoot" the flow
% control by this margin
transfer_limit_margin => 0 | neg_integer()}
}.
TLS
TLS is enabled by setting the tls_opts
connection configuration property.
Currently the only valid value is {secure_port, [tls_option]}
where the port
specified only accepts TLS. It is possible that tls negotiation as described
in the amqp 1.0 protocol will be supported in the future. If no value is provided
for tls_opt
then a plain socket will be used.
Basic Example
%% this will connect to a localhost node
{ok, Hostname} = inet:gethostname(),
User = <<"guest">>,
Password = <<"guest">>,
%% create a configuration map
OpnConf = #{address => Hostname,
port => Port,
container_id => <<"test-container">>,
sasl => {plain, User, Password}},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(Session, SenderLinkName, <<"a-queue-maybe">>),
%% wait for credit to be received
receive
{amqp10_event, {link, Sender, credited}} -> ok
after 2000 ->
exit(credited_timeout)
end.
%% Create a new message using a delivery-tag, body and indicate
%% its settlement status (true meaning no disposition confirmation
%% will be sent by the receiver).
OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
ok = amqp10_client:send_msg(Sender, OutMsg),
ok = amqp10_client:detach_link(Sender),
%% create a receiver link
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, <<"a-queue-maybe">>),
%% grant some credit to the remote sender but don't auto-renew it
ok = amqp10_client:flow_link_credit(Receiver, 5, never),
%% wait for a delivery
receive
{amqp10_msg, Receiver, InMsg} -> ok
after 2000 ->
exit(delivery_timeout)
end.
ok = amqp10_client:close_connection(Connection),
Events
The ampq10_client
API is mostly asynchronous with respect to the AMQP 1.0
protocol. Functions such as amqp10_client:open_connection
typically return
after the Open
frame has been successfully written to the socket rather than
waiting until the remote end returns with their Open
frame. The client will
notify the caller of various internal/async events using amqp10_event
messages. In the example above when the remote replies with their Open
frame
a message is sent of the following form:
{amqp10_event, {connection, ConnectionPid, opened}}
When the connection is closed an event is issued as such:
{amqp10_event, {connection, ConnectionPid, {closed, Why}}}
Why
could be normal
or contain a description of an error that occured
and resulted in the closure of the connection.
Likewise sessions and links have similar events using a similar format.
%% success events
{amqp10_event, {connection, ConnectionPid, opened}}
{amqp10_event, {session, SessionPid, begun}}
{amqp10_event, {link, LinkRef, attached}}
%% error events
{amqp10_event, {connection, ConnectionPid, {closed, Why}}}
{amqp10_event, {session, SessionPid, {ended, Why}}}
{amqp10_event, {link, LinkRef, {detached, Why}}}
In addition the client may notify the initiator of certain protocol events such as a receiver running out of credit or credit being available to a sender.
%% no more credit available to sender
{amqp10_event, {link, Sender, credit_exhausted}}
%% sender credit received
{amqp10_event, {link, Sender, credited}}
Other events may be declared as necessary, Hence it makes sense for a user
of the client to handle all {amqp10_event, _}
events to ensure unexpected
messages aren't kept around in the mailbox.