rabbitmq-server/deps/amqp10_client
Diana Parra Corbacho 31e5a722c5
Shovels: Optimise amqp10 client messages for shovel usage
AMQP10 shovels don't need the amqp10 message format, the binary
can be translated directly into a message container and also
the other way around. The new amqp10_raw_msg just stores the payload
and information required to create the transfer frame, skipping
a few unnecessary encoding/decoding operations of the AMQP10 sections.
2025-09-24 15:05:17 -04:00
..
include Support SQL filter expressions for streams 2025-06-26 11:56:55 +02:00
src Shovels: Optimise amqp10 client messages for shovel usage 2025-09-24 15:05:17 -04:00
test Detach link for link-level errors 2025-08-27 09:49:22 +02:00
.gitignore Cleanup .gitignore files for the monorepo 2024-06-28 12:00:52 +02:00
CODE_OF_CONDUCT.md Replace files with symlinks 2022-04-15 06:04:29 -07:00
CONTRIBUTING.md Replace files with symlinks 2022-04-15 06:04:29 -07:00
LICENSE Replace @rabbitmq.com addresses with rabbitmq-core@groups.vmware.com 2023-06-20 15:40:13 +04:00
LICENSE-MPL-RabbitMQ Revert drop of Exhibit B on MPL 2.0 2020-07-20 16:49:24 +01:00
Makefile Bump ActiveMQ to v6.1.7 2025-06-23 18:11:42 +02:00
README.md Support AMQP 1.0 natively 2024-02-28 14:15:20 +01:00
elvis.config Add elvis 2017-03-24 10:51:52 +00:00

README.md

Erlang AMQP 1.0 client

This is an Erlang client for the AMQP 1.0 protocol.

Its primary purpose is to be used in RabbitMQ related projects but it is a generic client that was tested with at least 3 implementations of AMQP 1.0.

If you are looking for an Erlang client for AMQP 0-9-1 — a completely different protocol despite the name — consider this one.

Project Maturity and Status

This client is used in the cross-protocol version of the RabbitMQ Shovel plugin. It is not 100% feature complete but moderately mature and was tested against at least 3 AMQP 1.0 servers: RabbitMQ, Azure ServiceBus, ActiveMQ.

This client library is not officially supported by VMware at this time.

Usage

Connection Settings

The connection_config map contains various configuration properties.

-type address :: inet:socket_address() | inet:hostname().

-type connection_config() ::
    #{container_id => binary(), % mandatory
      %% must provide a list of addresses or a single address
      addresses => [address()],
      address => address(), 
      %% defaults to 5672, mandatory for TLS
      port => inet:port_number(),
      % the dns name of the target host
      % required by some vendors such as Azure ServiceBus
      hostname => binary(),
      tls_opts => {secure_port, [ssl:tls_option()]}, % optional
      notify => pid(), % Pid to receive protocol notifications. Set to self() if not provided
      max_frame_size => non_neg_integer(), % incoming max frame size
      idle_time_out => non_neg_integer(), % heartbeat
      sasl => none | anon | {plain, User :: binary(), Password :: binary(),
      % set this to a negative value to allow a sender to "overshoot" the flow
      % control by this margin
      transfer_limit_margin => 0 | neg_integer()}
  }.

TLS

TLS is enabled by setting the tls_opts connection configuration property. Currently the only valid value is {secure_port, [tls_option]} where the port specified only accepts TLS. It is possible that tls negotiation as described in the amqp 1.0 protocol will be supported in the future. If no value is provided for tls_opt then a plain socket will be used.

Basic Example

%% this will connect to a localhost node
{ok, Hostname} = inet:gethostname(),
User = <<"guest">>,
Password = <<"guest">>,
%% create a configuration map
OpnConf = #{address => Hostname,
            port => Port,
            container_id => <<"test-container">>,
            sasl => {plain, User, Password}},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(Session, SenderLinkName, <<"a-queue-maybe">>),

%% wait for credit to be received
receive
    {amqp10_event, {link, Sender, credited}} -> ok
after 2000 ->
      exit(credited_timeout)
end.

%% Create a new message using a delivery-tag, body and indicate
%% its settlement status (true meaning no disposition confirmation
%% will be sent by the receiver).
OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
ok = amqp10_client:send_msg(Sender, OutMsg),
ok = amqp10_client:detach_link(Sender),

%% create a receiver link
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, <<"a-queue-maybe">>),

%% grant some credit to the remote sender but don't auto-renew it
ok = amqp10_client:flow_link_credit(Receiver, 5, never),

%% wait for a delivery
receive
    {amqp10_msg, Receiver, InMsg} -> ok
after 2000 ->
      exit(delivery_timeout)
end.

ok = amqp10_client:close_connection(Connection),

Events

The ampq10_client API is mostly asynchronous with respect to the AMQP 1.0 protocol. Functions such as amqp10_client:open_connection typically return after the Open frame has been successfully written to the socket rather than waiting until the remote end returns with their Open frame. The client will notify the caller of various internal/async events using amqp10_event messages. In the example above when the remote replies with their Open frame a message is sent of the following form:

{amqp10_event, {connection, ConnectionPid, opened}}

When the connection is closed an event is issued as such:

{amqp10_event, {connection, ConnectionPid, {closed, Why}}}

Why could be normal or contain a description of an error that occured and resulted in the closure of the connection.

Likewise sessions and links have similar events using a similar format.

%% success events
{amqp10_event, {connection, ConnectionPid, opened}}
{amqp10_event, {session, SessionPid, begun}}
{amqp10_event, {link, LinkRef, attached}}
%% error events
{amqp10_event, {connection, ConnectionPid, {closed, Why}}}
{amqp10_event, {session, SessionPid, {ended, Why}}}
{amqp10_event, {link, LinkRef, {detached, Why}}}

In addition the client may notify the initiator of certain protocol events such as a receiver running out of credit or credit being available to a sender.

%% no more credit available to sender
{amqp10_event, {link, Sender, credit_exhausted}}
%% sender credit received
{amqp10_event, {link, Sender, credited}}

Other events may be declared as necessary, Hence it makes sense for a user of the client to handle all {amqp10_event, _} events to ensure unexpected messages aren't kept around in the mailbox.