API refactor

This commit is contained in:
kjnilsson 2017-02-17 17:39:01 +00:00
parent 0b70bcd129
commit 401d2dae4c
6 changed files with 278 additions and 223 deletions

161
deps/amqp10_client/src/amqp10_client.erl vendored Normal file
View File

@ -0,0 +1,161 @@
-module(amqp10_client).
-include("amqp10_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-export([
open_connection/1,
open_connection/2,
close_connection/1,
begin_session/1,
begin_session_sync/1,
begin_session_sync/2,
end_session/1,
attach_sender_link/3,
attach_sender_link/4,
attach_receiver_link/3,
attach_receiver_link/4,
send_msg/2,
accept_msg/2,
flow_link_credit/2,
link_handle/1,
get_msg/1,
get_msg/2
% get_msg/1,
% get_msg/2,
]).
-define(DEFAULT_TIMEOUT, 5000).
-record(link_ref, {role :: sender | receiver,session :: pid(),
link_handle :: non_neg_integer(), link_name :: binary()}).
-opaque link_ref() :: #link_ref{}.
-export_type([link_ref/0
]).
-spec open_connection(
inet:socket_address() | inet:hostname(),
inet:port_number()) -> supervisor:startchild_ret().
open_connection(Addr, Port) ->
open_connection(#{address => Addr, port => Port, notify => self()}).
-spec open_connection(amqp10_client_connection:connection_config()) ->
supervisor:startchild_ret().
open_connection(ConnectionConfig0) ->
Notify = maps:get(notify, ConnectionConfig0, self()),
amqp10_client_connection:open(ConnectionConfig0#{notify => Notify}).
-spec close_connection(pid()) -> ok.
close_connection(Pid) ->
amqp10_client_connection:close(Pid, none).
-spec begin_session(pid()) -> supervisor:startchild_ret().
begin_session(Connection) when is_pid(Connection) ->
amqp10_client_connection:begin_session(Connection).
-spec begin_session_sync(pid()) ->
supervisor:startchild_ret() | session_timeout.
begin_session_sync(Connection) when is_pid(Connection) ->
begin_session_sync(Connection, ?DEFAULT_TIMEOUT).
-spec begin_session_sync(pid(), non_neg_integer()) ->
supervisor:startchild_ret() | session_timeout.
begin_session_sync(Connection, Timeout) when is_pid(Connection) ->
case begin_session(Connection) of
{ok, Session} ->
receive
{amqp10_event, {session, Session, begun}} ->
{ok, Session};
{amqp10_event, {session, Session, {error, Err}}} ->
{error, Err}
after Timeout -> session_timeout
end;
Ret -> Ret
end.
-spec end_session(pid()) -> ok.
end_session(Pid) ->
gen_fsm:send_event(Pid, 'end').
-spec attach_sender_link(pid(), binary(), binary()) ->
{ok, amqp10_client_link:link_ref()}.
attach_sender_link(Session, Name, Target) ->
% mixed should work with any type of msg
attach_sender_link(Session, Name, Target, mixed).
-spec attach_sender_link(pid(), binary(), binary(),
amqp10_client_session:snd_settle_mode()) ->
{ok, amqp10_client_link:link_ref()}.
attach_sender_link(Session, Name, Target, SettleMode) ->
AttachArgs = #{name => Name,
role => {sender, #{address => Target}},
snd_settle_mode => SettleMode,
rcv_settle_mode => first},
{ok, Attach} = amqp10_client_session:attach(Session, AttachArgs),
{ok, make_link_ref(sender, Session, Name, Attach)}.
-spec attach_receiver_link(pid(), binary(), binary()) ->
{ok, amqp10_client_link:link_ref()}.
attach_receiver_link(Session, Name, Source) ->
attach_receiver_link(Session, Name, Source, settled).
-spec attach_receiver_link(pid(), binary(), binary(),
amqp10_client_session:snd_settle_mode()) ->
{ok, amqp10_client_link:link_ref()}.
attach_receiver_link(Session, Name, Source, SettleMode) ->
AttachArgs = #{name => Name,
role => {receiver, #{address => Source}, self()},
snd_settle_mode => SettleMode,
rcv_settle_mode => first},
{ok, Attach} = amqp10_client_session:attach(Session, AttachArgs),
{ok, make_link_ref(receiver, Session, Name, Attach)}.
-spec flow_link_credit(link_ref(), non_neg_integer()) -> ok.
flow_link_credit(#link_ref{role = receiver, session = Session,
link_handle = Handle}, Credit) ->
Flow = #'v1_0.flow'{link_credit = {uint, Credit}},
ok = amqp10_client_session:flow(Session, Handle, Flow).
%%% messages
% Returns ok for "async" transfers when messages are send with settled=true
% else it returns the delivery state from the disposition
% TODO: timeouts
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
{ok, non_neg_integer()} | {error, insufficient_credit | link_not_found}.
send_msg(#link_ref{role = sender, session = Session,
link_handle = Handle}, Msg0) ->
Msg = amqp10_msg:set_handle(Handle, Msg0),
amqp10_client_session:transfer(Session, Msg, ?DEFAULT_TIMEOUT).
-spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok.
accept_msg(#link_ref{role = receiver, session = Session}, Msg) ->
DeliveryId = amqp10_msg:delivery_id(Msg),
amqp10_client_session:disposition(Session, receiver, DeliveryId,
DeliveryId, true, accepted).
get_msg(LinkRef) ->
get_msg(LinkRef, ?DEFAULT_TIMEOUT).
get_msg(#link_ref{role = receiver, session = Session, link_handle = Handle},
Timeout) ->
%flow 1
Flow = #'v1_0.flow'{link_credit = {uint, 1}},
ok = amqp10_client_session:flow(Session, Handle, Flow),
% wait for transfer
receive
{amqp10_msg, Handle, Message} -> {ok, Message}
after Timeout ->
{error, timeout}
end.
link_handle(#link_ref{link_handle = Handle}) -> Handle.
%%% Helpers
make_link_ref(Role, Session, Name, Handle) ->
#link_ref{role = Role, session = Session, link_name = Name,
link_handle = Handle}.

View File

@ -8,10 +8,6 @@
%% Public API.
-export([
open/1,
open/2,
open_sync/1,
open_sync/2,
close/1,
close/2
]).
@ -19,7 +15,7 @@
-export([start_link/2,
socket_ready/2,
protocol_header_received/5,
begin_session/2,
begin_session/1,
heartbeat/1
]).
@ -72,27 +68,6 @@
%% Public API.
%% -------------------------------------------------------------------
-spec open(
inet:socket_address() | inet:hostname(),
inet:port_number()) -> supervisor:startchild_ret().
open(Addr, Port) ->
open(#{address => Addr, port => Port, notify => self()}).
-spec open_sync(connection_config()) ->
supervisor:startchild_ret() | connection_timeout.
open_sync(Config) ->
open_sync(Config, ?DEFAULT_TIMEOUT).
-spec open_sync(connection_config(), timeout()) ->
supervisor:startchild_ret() | connection_timeout.
open_sync(Config, Timeout) ->
{ok, Conn} = open(Config),
receive
{opened, Conn} -> {ok, Conn}
after Timeout -> connection_timeout
end.
-spec open(connection_config()) -> supervisor:startchild_ret().
open(Config) ->
%% Start the supervision tree dedicated to that connection. It
@ -115,10 +90,6 @@ open(Config) ->
Error
end.
-spec close(pid()) -> ok.
close(Pid) ->
close(Pid, none).
-spec close(pid(), {amqp10_client_types:amqp_error()
| amqp10_client_types:connection_error(), binary()} | none) -> ok.
close(Pid, Reason) ->
@ -145,10 +116,10 @@ socket_ready(Pid, Socket) ->
protocol_header_received(Pid, Protocol, Maj, Min, Rev) ->
gen_fsm:send_event(Pid, {protocol_header_received, Protocol, Maj, Min, Rev}).
-spec begin_session(pid(), boolean()) -> supervisor:startchild_ret().
-spec begin_session(pid()) -> supervisor:startchild_ret().
begin_session(Pid, Notify) ->
gen_fsm:sync_send_all_state_event(Pid, {begin_session, Notify}).
begin_session(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, begin_session).
heartbeat(Pid) ->
gen_fsm:send_event(Pid, heartbeat).
@ -213,8 +184,8 @@ open_sent(#'v1_0.open'{max_frame_size = MFSz, idle_time_out = Timeout},
State1 = State#state{config =
Config#{outgoing_max_frame_size => unpack(MFSz)}},
State2 = lists:foldr(
fun({From, Notify}, S0) ->
{Ret, S2} = handle_begin_session(From, Notify, S0),
fun(From, S0) ->
{Ret, S2} = handle_begin_session(From, S0),
_ = gen_fsm:reply(From, Ret),
S2
end, State1, PendingSessionReqs),
@ -258,18 +229,18 @@ handle_event({set_other_procs, OtherProcs}, StateName, State) ->
handle_event(_Event, StateName, State) ->
{next_state, StateName, State}.
handle_sync_event({begin_session, Notify}, From, opened, State) ->
{Ret, State1} = handle_begin_session(From, Notify, State),
handle_sync_event(begin_session, From, opened, State) ->
{Ret, State1} = handle_begin_session(From, State),
{reply, Ret, opened, State1};
handle_sync_event({begin_session, Notify}, From, StateName,
handle_sync_event(begin_session, From, StateName,
#state{pending_session_reqs = PendingSessionReqs} = State)
when StateName =/= close_sent ->
%% The caller already asked for a new session but the connection
%% isn't fully opened. Let's queue this request until the connection
%% is ready.
State1 = State#state{pending_session_reqs = [{From, Notify} | PendingSessionReqs]},
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
{next_state, StateName, State1};
handle_sync_event({begin_session, _Notify}, _, StateName, State) ->
handle_sync_event(begin_session, _From, StateName, State) ->
{reply, {error, connection_closed}, StateName, State};
handle_sync_event(_Event, _From, StateName, State) ->
Reply = ok,
@ -292,11 +263,11 @@ code_change(_OldVsn, StateName, State, _Extra) ->
%% Internal functions.
%% -------------------------------------------------------------------
handle_begin_session({FromPid, _Ref}, Notify,
handle_begin_session({FromPid, _Ref},
#state{sessions_sup = Sup, reader = Reader,
next_channel = Channel,
config = Config} = State) ->
Ret = supervisor:start_child(Sup, [FromPid, Notify, Channel, Reader, Config]),
Ret = supervisor:start_child(Sup, [FromPid, Channel, Reader, Config]),
State1 = case Ret of
{ok, _} -> State#state{next_channel = Channel + 1};
_ -> State
@ -361,14 +332,12 @@ send_heartbeat(#state{socket = Socket}) ->
gen_tcp:send(Socket, Frame).
notify_opened(#{notify := Pid}) ->
Pid ! {opened, self()},
ok;
notify_opened(_Config) -> ok.
Pid ! amqp10_event(opened),
ok.
notify_closed(#{notify := Pid}, Reason) ->
Pid ! {closed, self(), Reason},
ok;
notify_closed(_Config, _Reason) -> ok.
Pid ! amqp10_event({closed, Reason}),
ok.
start_heartbeat_timer(Timeout) ->
timer:apply_after(Timeout, ?MODULE, heartbeat, [self()]).
@ -411,3 +380,6 @@ translate_err(#'v1_0.error'{condition = Cond, description = Desc}) ->
end,
{Err, unpack(Desc)}.
amqp10_event(Evt) ->
{amqp10_event, {connection, self(), Evt}}.

View File

@ -4,98 +4,5 @@
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-export([
sender/3,
sender/4,
receiver/3,
receiver/4,
send/2,
get_one/1,
get_one/2,
accept/2,
flow_credit/2,
link_handle/1
]).
-define(DEFAULT_TIMEOUT, 5000).
-record(link_ref, {role :: sender | receiver,session :: pid(),
link_handle :: non_neg_integer(), link_name :: binary()}).
-opaque link_ref() :: #link_ref{}.
-export_type([link_ref/0
]).
link_handle(#link_ref{link_handle = Handle}) -> Handle.
get_one(LinkRef) ->
get_one(LinkRef, ?DEFAULT_TIMEOUT).
get_one(#link_ref{role = receiver, session = Session, link_handle = Handle},
Timeout) ->
%flow 1
Flow = #'v1_0.flow'{link_credit = {uint, 1}},
ok = amqp10_client_session:flow(Session, Handle, Flow),
% wait for transfer
receive
{message, Handle, Message} -> {amqp_msg, Message}
after Timeout ->
{error, timeout}
end.
-spec flow_credit(link_ref(), non_neg_integer()) -> ok.
flow_credit(#link_ref{role = receiver, session = Session,
link_handle = Handle}, Credit) ->
Flow = #'v1_0.flow'{link_credit = {uint, Credit}},
ok = amqp10_client_session:flow(Session, Handle, Flow).
% Returns ok for "async" transfers when messages are send with settled=true
% else it returns the delivery state from the disposition
% TODO: timeouts
-spec send(link_ref(), amqp10_msg:amqp10_msg()) ->
{ok, non_neg_integer()} | {error, insufficient_credit | link_not_found}.
send(#link_ref{role = sender, session = Session,
link_handle = Handle}, Msg0) ->
Msg = amqp10_msg:set_handle(Handle, Msg0),
amqp10_client_session:transfer(Session, Msg, ?DEFAULT_TIMEOUT).
-spec accept(link_ref(), amqp10_msg:amqp10_msg()) -> ok.
accept(#link_ref{role = receiver, session = Session}, Msg) ->
DeliveryId = amqp10_msg:delivery_id(Msg),
amqp10_client_session:disposition(Session, receiver, DeliveryId, DeliveryId, true,
accepted).
-spec sender(pid(), binary(), binary()) -> {ok, link_ref()}.
sender(Session, Name, Target) ->
sender(Session, Name, Target, mixed). % mixed should work with any type of msg
-spec sender(pid(), binary(), binary(),
amqp10_client_session:snd_settle_mode()) -> {ok, link_ref()}.
sender(Session, Name, Target, SettleMode) ->
AttachArgs = #{name => Name,
role => {sender, #{address => Target}},
snd_settle_mode => SettleMode,
rcv_settle_mode => first},
{ok, Attach} = amqp10_client_session:attach(Session, AttachArgs),
{ok, #link_ref{role = sender, session = Session, link_name = Name,
link_handle = Attach}}.
-spec receiver(pid(), binary(), binary()) -> {ok, link_ref()}.
receiver(Session, Name, Source) ->
receiver(Session, Name, Source, settled).
-spec receiver(pid(), binary(), binary(),
amqp10_client_session:snd_settle_mode()) -> {ok, link_ref()}.
receiver(Session, Name, Source, SettleMode) ->
AttachArgs = #{name => Name,
role => {receiver, #{address => Source}, self()},
snd_settle_mode => SettleMode,
rcv_settle_mode => first},
{ok, Attach} = amqp10_client_session:attach(Session, AttachArgs),
{ok, #link_ref{role = receiver, session = Session, link_name = Name,
link_handle = Attach}}.

View File

@ -17,7 +17,7 @@
]).
%% Private API.
-export([start_link/5,
-export([start_link/4,
socket_ready/2
]).
@ -83,8 +83,7 @@
}).
-record(state,
{owner :: pid(),
channel :: pos_integer(),
{channel :: pos_integer(),
remote_channel :: pos_integer() | undefined,
next_incoming_id = 0 :: transfer_id(),
incoming_window = ?MAX_SESSION_WINDOW_SIZE :: non_neg_integer(),
@ -106,7 +105,7 @@
% can reference transfers for many different links
unsettled = #{} :: #{transfer_id() => {link_handle(), any()}}, %TODO: refine as FsmRef
incoming_unsettled = #{} :: #{transfer_id() => link_handle()},
notify :: boolean()
notify :: pid()
}).
@ -119,7 +118,7 @@
%% The connection process is responsible for allocating a channel
%% number and contact the sessions supervisor to start a new session
%% process.
amqp10_client_connection:begin_session(Connection, false).
amqp10_client_connection:begin_session(Connection).
-spec begin_sync(pid()) -> supervisor:startchild_ret().
begin_sync(Connection) ->
@ -128,7 +127,7 @@ begin_sync(Connection) ->
-spec begin_sync(pid(), non_neg_integer()) ->
supervisor:startchild_ret() | session_timeout.
begin_sync(Connection, Timeout) ->
{ok, Session} = amqp10_client_connection:begin_session(Connection, true),
{ok, Session} = amqp10_client_connection:begin_session(Connection),
receive
{session_begin, Session} -> {ok, Session}
after Timeout -> session_timeout
@ -152,7 +151,7 @@ flow(Session, Handle, Flow) ->
gen_fsm:send_event(Session, {flow, Handle, Flow}).
-spec disposition(pid(), link_role(), transfer_id(), transfer_id(), boolean(),
amqp10_client_types:delivery_state()) -> ok.
amqp10_client_types:delivery_state()) -> ok.
disposition(Session, Role, First, Last, Settled, DeliveryState) ->
gen_fsm:sync_send_event(Session, {disposition, Role, First, Last, Settled,
DeliveryState}).
@ -163,8 +162,8 @@ disposition(Session, Role, First, Last, Settled, DeliveryState) ->
%% Private API.
%% -------------------------------------------------------------------
start_link(From, Notify, Channel, Reader, ConnConfig) ->
gen_fsm:start_link(?MODULE, [From, Notify, Channel, Reader, ConnConfig], []).
start_link(From, Channel, Reader, ConnConfig) ->
gen_fsm:start_link(?MODULE, [From, Channel, Reader, ConnConfig], []).
-spec socket_ready(pid(), gen_tcp:socket()) -> ok.
@ -175,10 +174,9 @@ socket_ready(Pid, Socket) ->
%% gen_fsm callbacks.
%% -------------------------------------------------------------------
init([From, Notify, Channel, Reader, ConnConfig]) ->
init([FromPid, Channel, Reader, ConnConfig]) ->
amqp10_client_frame_reader:register_session(Reader, self(), Channel),
State = #state{owner = From, channel = Channel, reader = Reader,
notify = Notify,
State = #state{notify = FromPid, channel = Channel, reader = Reader,
connection_config = ConnConfig},
{ok, unmapped, State}.
@ -206,7 +204,7 @@ begin_sent(#'v1_0.begin'{remote_channel = {ushort, RemoteChannel},
send_attach(fun send/2, Attach, From, S)
end, State1, EARs),
ok = notify_session_begin(State2),
ok = notify_session_begun(State2),
{next_state, mapped, State2#state{early_attach_requests = [],
next_incoming_id = NOI,
@ -320,7 +318,7 @@ mapped({#'v1_0.transfer'{handle = {uint, InHandle},
Msg = decode_as_msg(Transfer, Payload),
% deliver to the registered receiver process
TargetPid ! {message, OutHandle, Msg},
TargetPid ! {amqp10_msg, OutHandle, Msg},
% stash the DeliveryId - not sure for what yet
Unsettled = case Settled of
@ -349,7 +347,7 @@ mapped(#'v1_0.disposition'{role = true, settled = true, first = {uint, First},
case Acc of
#{Id := {_Handle, Receiver}} ->
S = translate_delivery_state(DeliveryState),
ok = notify_disposition(Receiver, {Id, S}),
ok = notify_disposition(Receiver, {S, Id}),
maps:remove(Id, Acc);
_ -> Acc
end
@ -643,13 +641,12 @@ translate_role(sender) -> false;
translate_role(receiver) -> true.
notify_session_begin(#state{owner = Owner, notify = true}) ->
Owner ! {session_begin, self()},
ok;
notify_session_begin(_State) -> ok.
notify_session_begun(#state{notify = Pid}) ->
Pid ! amqp10_event(begun),
ok.
notify_disposition({Pid, _}, N) ->
Pid ! {disposition, N},
Pid ! {amqp10_disposition, N},
ok.
book_transfer_send(#link{output_handle = Handle} = Link,
@ -701,6 +698,8 @@ decode_as_msg(Transfer, Payload) ->
Records = rabbit_amqp1_0_framing:decode_bin(Payload),
amqp10_msg:from_amqp_records([Transfer | Records]).
amqp10_event(Evt) ->
{amqp10_event, {session, self(), Evt}}.

View File

@ -35,13 +35,25 @@
illegal_state | frame_size_too_small.
-type connection_error() :: connection_forced | framing_error | redirect.
-type session_error() :: atom(). % TODO
-type link_error() :: atom(). % TODO
-type connection_event_detail() :: opened | {closed, Reason::any()} | {error, {connection_error(), any()}}.
-type session_event_detail() :: begun | ended | {error, {session_error(), any()}}.
-type link_event_detail() :: attached | detached | {error, {link_error(), any()}}.
-type amqp10_event_detail() :: {connection, pid(), connection_event_detail()} |
{session, pid(), session_event_detail()} |
{link, amqp10_client_link:link_ref(), link_event_detail()}.
-type amqp10_event() :: {amqp10_event, amqp10_event_detail()}.
-export_type([amqp10_performative/0, channel/0,
source/0, target/0, amqp10_msg_record/0,
delivery_state/0, amqp_error/0, connection_error/0
delivery_state/0, amqp_error/0, connection_error/0,
amqp10_event_detail/0, amqp10_event/0
]).
unpack(undefined) -> undefined;
unpack({_, Value}) -> Value;
unpack(Value) -> Value.

View File

@ -148,26 +148,30 @@ open_close_connection(Config) ->
OpnConf = #{address => Hostname, port => Port,
notify => self(),
container_id => <<"open_close_connection_container">>},
{ok, Connection} = amqp10_client_connection:open(Hostname, Port),
{ok, Connection2} = amqp10_client_connection:open_sync(OpnConf),
ok = amqp10_client_connection:close(Connection2),
ok = amqp10_client_connection:close(Connection).
{ok, Connection} = amqp10_client:open_connection(Hostname, Port),
{ok, Connection2} = amqp10_client:open_connection(OpnConf),
receive
{amqp10_event, {connection, Connection2, opened}} -> ok
after 5000 -> connection_timeout
end,
ok = amqp10_client:close_connection(Connection2),
ok = amqp10_client:close_connection(Connection).
basic_roundtrip(Config) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
ct:pal("Opening connection to ~s:~b", [Hostname, Port]),
{ok, Connection} = amqp10_client_connection:open(Hostname, Port),
{ok, Session} = amqp10_client_session:'begin'(Connection),
{ok, Sender} = amqp10_client_link:sender(Session, <<"banana-sender">>,
{ok, Connection} = amqp10_client:open_connection(Hostname, Port),
{ok, Session} = amqp10_client:begin_session(Connection),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"banana-sender">>,
<<"test">>),
Msg = amqp10_msg:new(<<"my-tag">>, <<"banana">>, true),
{ok, _} = amqp10_client_link:send(Sender, Msg),
{ok, Receiver} = amqp10_client_link:receiver(Session, <<"banana-receiver">>,
{ok, _} = amqp10_client:send_msg(Sender, Msg),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"banana-receiver">>,
<<"test">>),
{amqp_msg, OutMsg} = amqp10_client_link:get_one(Receiver),
ok = amqp10_client_session:'end'(Session),
ok = amqp10_client_connection:close(Connection),
{ok, OutMsg} = amqp10_client:get_msg(Receiver),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection),
?assertEqual([<<"banana">>], amqp10_msg:body(OutMsg)).
split_transfer(Config) ->
@ -175,65 +179,65 @@ split_transfer(Config) ->
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
ct:pal("Opening connection to ~s:~b", [Hostname, Port]),
Conf = #{address => Hostname, port => Port, max_frame_size => 512},
{ok, Connection} = amqp10_client_connection:open(Conf),
{ok, Session} = amqp10_client_session:'begin'(Connection),
{ok, Connection} = amqp10_client:open_connection(Conf),
{ok, Session} = amqp10_client:begin_session(Connection),
Data = list_to_binary(string:chars(64, 1000)),
{ok, Sender} = amqp10_client_link:sender(Session, <<"data-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"data-sender">>,
<<"test">>),
Msg = amqp10_msg:new(<<"my-tag">>, Data, true),
{ok, _} = amqp10_client_link:send(Sender, Msg),
{ok, Receiver} = amqp10_client_link:receiver(Session, <<"data-receiver">>,
{ok, _} = amqp10_client:send_msg(Sender, Msg),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"data-receiver">>,
<<"test">>),
{amqp_msg, OutMsg} = amqp10_client_link:get_one(Receiver),
ok = amqp10_client_session:'end'(Session),
ok = amqp10_client_connection:close(Connection),
{ok, OutMsg} = amqp10_client:get_msg(Receiver),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection),
?assertEqual([Data], amqp10_msg:body(OutMsg)).
transfer_unsettled(Config) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Conf = #{address => Hostname, port => Port},
{ok, Connection} = amqp10_client_connection:open(Conf),
{ok, Session} = amqp10_client_session:'begin'(Connection),
{ok, Connection} = amqp10_client:open_connection(Conf),
{ok, Session} = amqp10_client:begin_session(Connection),
Data = list_to_binary(string:chars(64, 1000)),
{ok, Sender} = amqp10_client_link:sender(Session, <<"data-sender">>,
<<"test">>, unsettled),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"data-sender">>,
<<"test">>, unsettled),
Msg = amqp10_msg:new(<<"my-tag">>, Data, false),
{ok, DeliveryId} = amqp10_client_link:send(Sender, Msg),
{ok, DeliveryId} = amqp10_client:send_msg(Sender, Msg),
ok = await_disposition(DeliveryId),
{ok, Receiver} = amqp10_client_link:receiver(Session, <<"data-receiver">>,
<<"test">>, unsettled),
{amqp_msg, OutMsg} = amqp10_client_link:get_one(Receiver),
ok = amqp10_client_link:accept(Receiver, OutMsg),
{error, timeout} = amqp10_client_link:get_one(Receiver, 1000),
ok = amqp10_client_session:'end'(Session),
ok = amqp10_client_connection:close(Connection),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"data-receiver">>,
<<"test">>, unsettled),
{ok, OutMsg} = amqp10_client:get_msg(Receiver),
ok = amqp10_client:accept_msg(Receiver, OutMsg),
{error, timeout} = amqp10_client:get_msg(Receiver, 1000),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection),
?assertEqual([Data], amqp10_msg:body(OutMsg)).
subscribe(Config) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
QueueName = <<"test-sub">>,
{ok, Connection} = amqp10_client_connection:open(Hostname, Port),
{ok, Session} = amqp10_client_session:'begin'(Connection),
{ok, Sender} = amqp10_client_link:sender(Session, <<"sub-sender">>,
{ok, Connection} = amqp10_client:open_connection(Hostname, Port),
{ok, Session} = amqp10_client:begin_session(Connection),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sub-sender">>,
QueueName),
Msg1 = amqp10_msg:new(<<"my-taggy">>, <<"banana">>, false),
Msg2 = amqp10_msg:new(<<"my-taggy2">>, <<"banana">>, false),
{ok, DeliveryId1} = amqp10_client_link:send(Sender, Msg1),
{ok, DeliveryId1} = amqp10_client:send_msg(Sender, Msg1),
ok = await_disposition(DeliveryId1),
{ok, DeliveryId2} = amqp10_client_link:send(Sender, Msg2),
{ok, DeliveryId2} = amqp10_client:send_msg(Sender, Msg2),
ok = await_disposition(DeliveryId2),
{ok, Receiver} = amqp10_client_link:receiver(Session, <<"sub-receiver">>,
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"sub-receiver">>,
QueueName, unsettled),
ok = amqp10_client_link:flow_credit(Receiver, 2),
ok = amqp10_client:flow_link_credit(Receiver, 2),
ok = receive_one(Receiver),
ok = receive_one(Receiver),
timeout = receive_one(Receiver),
ok = amqp10_client_session:'end'(Session),
ok = amqp10_client_connection:close(Connection).
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).
insufficient_credit(Config) ->
@ -263,16 +267,16 @@ insufficient_credit(Config) ->
ok = mock_server:set_steps(?config(mock_server, Config), Steps),
Cfg = #{address => Hostname, port => Port, sasl => none},
{ok, Connection} = amqp10_client_connection:open(Cfg),
{ok, Session} = amqp10_client_session:begin_sync(Connection),
{ok, Sender} = amqp10_client_link:sender(Session, <<"mock1-sender">>,
<<"test">>),
Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()},
{ok, Connection} = amqp10_client:open_connection(Cfg),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"mock1-sender">>,
<<"test">>),
Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true),
{error, insufficient_credit} = amqp10_client_link:send(Sender, Msg),
{error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg),
ok = amqp10_client_session:'end'(Session),
ok = amqp10_client_connection:close(Connection),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection),
ok.
@ -281,12 +285,12 @@ outgoing_heartbeat(Config) ->
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
CConf = #{address => Hostname, port => Port,
idle_time_out => 5000},
{ok, Connection} = amqp10_client_connection:open(CConf),
{ok, Connection} = amqp10_client:open_connection(CConf),
timer:sleep(35 * 1000), % activemq defaults to 15s I believe
% check we can still establish a session
{ok, Session} = amqp10_client_session:begin_sync(Connection),
ok = amqp10_client_session:'end'(Session),
ok = amqp10_client_connection:close(Connection).
{ok, Session} = amqp10_client:begin_session_sync(Connection),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).
incoming_heartbeat(Config) ->
Hostname = ?config(mock_host, Config),
@ -308,10 +312,10 @@ incoming_heartbeat(Config) ->
ok = mock_server:set_steps(Mock, Steps),
CConf = #{address => Hostname, port => Port, sasl => none,
idle_time_out => 1000, notify => self()},
{ok, Connection} = amqp10_client_connection:open(CConf),
{ok, Connection} = amqp10_client:open_connection(CConf),
receive
{closed, Connection,
{resource_limit_exceeded, <<"remote idle-time-out">>}} ->
{amqp10_event, {connection, Connection,
{closed, {resource_limit_exceeded, <<"remote idle-time-out">>}}}} ->
ok
after 5000 ->
exit(incoming_heartbeat_assert)
@ -320,19 +324,19 @@ incoming_heartbeat(Config) ->
%%% HELPERS
%%%
receive_one(Receiver) ->
Handle = amqp10_client_link:link_handle(Receiver),
Handle = amqp10_client:link_handle(Receiver),
receive
{message, Handle, Msg} ->
amqp10_client_link:accept(Receiver, Msg)
{amqp10_msg, Handle, Msg} ->
amqp10_client:accept_msg(Receiver, Msg)
after 2000 ->
timeout
end.
await_disposition(DeliveryId) ->
receive
{disposition, {DeliveryId, accepted}} -> ok
{amqp10_disposition, {accepted, DeliveryId}} -> ok
after 3000 -> exit(dispostion_timeout)
end.