Add basic mock_server for testing.
Allow connections without sasl. Add sync version of session:begin
This commit is contained in:
parent
8f9b4092af
commit
3de1e1a95e
|
|
@ -16,7 +16,7 @@
|
|||
-export([start_link/2,
|
||||
socket_ready/2,
|
||||
protocol_header_received/5,
|
||||
begin_session/1
|
||||
begin_session/2
|
||||
]).
|
||||
|
||||
%% gen_fsm callbacks.
|
||||
|
|
@ -37,11 +37,13 @@
|
|||
opened/2,
|
||||
expecting_close_frame/2]).
|
||||
|
||||
-type connection_config() :: #{address => inet:socket_address() | inet:hostname(),
|
||||
port => inet:port_number(),
|
||||
max_frame_size => non_neg_integer(), % TODO constrain to large than 512
|
||||
outgoing_max_frame_size => non_neg_integer() | undefined
|
||||
}.
|
||||
-type connection_config() ::
|
||||
#{address => inet:socket_address() | inet:hostname(),
|
||||
port => inet:port_number(),
|
||||
max_frame_size => non_neg_integer(), % TODO constrain to large than 512
|
||||
outgoing_max_frame_size => non_neg_integer() | undefined,
|
||||
sasl => none | anon | {plain, binary(), binary()} % {plain, User, Pwd}
|
||||
}.
|
||||
|
||||
-record(state,
|
||||
{next_channel = 1 :: pos_integer(),
|
||||
|
|
@ -114,10 +116,10 @@ socket_ready(Pid, Socket) ->
|
|||
protocol_header_received(Pid, Protocol, Maj, Min, Rev) ->
|
||||
gen_fsm:send_event(Pid, {protocol_header_received, Protocol, Maj, Min, Rev}).
|
||||
|
||||
-spec begin_session(pid()) -> supervisor:startchild_ret().
|
||||
-spec begin_session(pid(), boolean()) -> supervisor:startchild_ret().
|
||||
|
||||
begin_session(Pid) ->
|
||||
gen_fsm:sync_send_all_state_event(Pid, begin_session).
|
||||
begin_session(Pid, Notify) ->
|
||||
gen_fsm:sync_send_all_state_event(Pid, {begin_session, Notify}).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% gen_fsm callbacks.
|
||||
|
|
@ -127,10 +129,16 @@ init([Sup, Config]) ->
|
|||
{ok, expecting_socket, #state{connection_sup = Sup,
|
||||
config = Config}}.
|
||||
|
||||
expecting_socket({socket_ready, Socket}, State) ->
|
||||
expecting_socket({socket_ready, Socket}, State = #state{config = Cfg}) ->
|
||||
State1 = State#state{socket = Socket},
|
||||
ok = gen_tcp:send(Socket, ?SASL_PROTOCOL_HEADER),
|
||||
{next_state, expecting_sasl_protocol_header, State1}.
|
||||
case Cfg of
|
||||
#{sasl := none} ->
|
||||
ok = gen_tcp:send(Socket, ?AMQP_PROTOCOL_HEADER),
|
||||
{next_state, expecting_amqp_protocol_header, State1};
|
||||
_ -> % assume anonymous
|
||||
ok = gen_tcp:send(Socket, ?SASL_PROTOCOL_HEADER),
|
||||
{next_state, expecting_sasl_protocol_header, State1}
|
||||
end.
|
||||
|
||||
expecting_sasl_protocol_header({protocol_header_received, 3, 1, 0, 0}, State) ->
|
||||
{next_state, expecting_sasl_mechanisms, State}.
|
||||
|
|
@ -167,8 +175,8 @@ expecting_open_frame(
|
|||
State = State0#state{config =
|
||||
Config#{outgoing_max_frame_size => unpack(MFSz)}},
|
||||
State3 = lists:foldr(
|
||||
fun(From, State1) ->
|
||||
{Ret, State2} = handle_begin_session(State1),
|
||||
fun({From, Notify}, State1) ->
|
||||
{Ret, State2} = handle_begin_session(From, Notify, State1),
|
||||
_ = gen_fsm:reply(From, Ret),
|
||||
State2
|
||||
end, State, PendingSessionReqs),
|
||||
|
|
@ -201,18 +209,18 @@ handle_event({set_other_procs, OtherProcs}, StateName, State) ->
|
|||
handle_event(_Event, StateName, State) ->
|
||||
{next_state, StateName, State}.
|
||||
|
||||
handle_sync_event(begin_session, _, opened, State) ->
|
||||
{Ret, State1} = handle_begin_session(State),
|
||||
handle_sync_event({begin_session, Notify}, From, opened, State) ->
|
||||
{Ret, State1} = handle_begin_session(From, Notify, State),
|
||||
{reply, Ret, opened, State1};
|
||||
handle_sync_event(begin_session, From, StateName,
|
||||
handle_sync_event({begin_session, Notify}, From, StateName,
|
||||
#state{pending_session_reqs = PendingSessionReqs} = State)
|
||||
when StateName =/= expecting_close_frame ->
|
||||
%% The caller already asked for a new session but the connection
|
||||
%% isn't fully opened. Let's queue this request until the connection
|
||||
%% is ready.
|
||||
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
|
||||
State1 = State#state{pending_session_reqs = [{From, Notify} | PendingSessionReqs]},
|
||||
{next_state, StateName, State1};
|
||||
handle_sync_event(begin_session, _, StateName, State) ->
|
||||
handle_sync_event({begin_session, _Notify}, _, StateName, State) ->
|
||||
{reply, {error, connection_closed}, StateName, State};
|
||||
handle_sync_event(_Event, _From, StateName, State) ->
|
||||
Reply = ok,
|
||||
|
|
@ -235,10 +243,11 @@ code_change(_OldVsn, StateName, State, _Extra) ->
|
|||
%% Internal functions.
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
handle_begin_session(#state{sessions_sup = Sup, reader = Reader,
|
||||
handle_begin_session({FromPid, _Ref}, Notify,
|
||||
#state{sessions_sup = Sup, reader = Reader,
|
||||
next_channel = Channel,
|
||||
config = Config} = State) ->
|
||||
Ret = supervisor:start_child(Sup, [Channel, Reader, Config]),
|
||||
Ret = supervisor:start_child(Sup, [FromPid, Notify, Channel, Reader, Config]),
|
||||
State1 = case Ret of
|
||||
{ok, _} -> State#state{next_channel = Channel + 1};
|
||||
_ -> State
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@
|
|||
|
||||
%% Public API.
|
||||
-export(['begin'/1,
|
||||
begin_sync/1,
|
||||
begin_sync/2,
|
||||
'end'/1,
|
||||
attach/2,
|
||||
transfer/3,
|
||||
|
|
@ -15,7 +17,7 @@
|
|||
]).
|
||||
|
||||
%% Private API.
|
||||
-export([start_link/3,
|
||||
-export([start_link/5,
|
||||
socket_ready/2
|
||||
]).
|
||||
|
||||
|
|
@ -36,6 +38,7 @@
|
|||
|
||||
-define(MAX_SESSION_WINDOW_SIZE, 65535).
|
||||
-define(DEFAULT_MAX_HANDLE, 16#ffffffff).
|
||||
-define(DEFAULT_TIMEOUT, 5000).
|
||||
-define(INITIAL_OUTGOING_ID, 65535).
|
||||
-define(INITIAL_DELIVERY_COUNT, 0).
|
||||
|
||||
|
|
@ -80,7 +83,8 @@
|
|||
}).
|
||||
|
||||
-record(state,
|
||||
{channel :: pos_integer(),
|
||||
{owner :: pid(),
|
||||
channel :: pos_integer(),
|
||||
remote_channel :: pos_integer() | undefined,
|
||||
next_incoming_id = 0 :: transfer_id(),
|
||||
incoming_window = ?MAX_SESSION_WINDOW_SIZE :: non_neg_integer(),
|
||||
|
|
@ -101,7 +105,8 @@
|
|||
% the unsettled map needs to go in the session state as a disposition
|
||||
% can reference transfers for many different links
|
||||
unsettled = #{} :: #{transfer_id() => {link_handle(), any()}}, %TODO: refine as FsmRef
|
||||
incoming_unsettled = #{} :: #{transfer_id() => link_handle()}
|
||||
incoming_unsettled = #{} :: #{transfer_id() => link_handle()},
|
||||
notify :: boolean()
|
||||
}).
|
||||
|
||||
|
||||
|
|
@ -114,7 +119,20 @@
|
|||
%% The connection process is responsible for allocating a channel
|
||||
%% number and contact the sessions supervisor to start a new session
|
||||
%% process.
|
||||
amqp10_client_connection:begin_session(Connection).
|
||||
amqp10_client_connection:begin_session(Connection, false).
|
||||
|
||||
-spec begin_sync(pid()) -> supervisor:startchild_ret().
|
||||
begin_sync(Connection) ->
|
||||
begin_sync(Connection, ?DEFAULT_TIMEOUT).
|
||||
|
||||
-spec begin_sync(pid(), non_neg_integer()) ->
|
||||
supervisor:startchild_ret() | session_timeout.
|
||||
begin_sync(Connection, Timeout) ->
|
||||
{ok, Session} = amqp10_client_connection:begin_session(Connection, true),
|
||||
receive
|
||||
{session_begin, Session} -> {ok, Session}
|
||||
after Timeout -> session_timeout
|
||||
end.
|
||||
|
||||
-spec 'end'(pid()) -> ok.
|
||||
'end'(Pid) ->
|
||||
|
|
@ -145,8 +163,8 @@ disposition(Session, First, Last, Settled, DeliveryState) ->
|
|||
%% Private API.
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
start_link(Channel, Reader, ConnConfig) ->
|
||||
gen_fsm:start_link(?MODULE, [Channel, Reader, ConnConfig], []).
|
||||
start_link(From, Notify, Channel, Reader, ConnConfig) ->
|
||||
gen_fsm:start_link(?MODULE, [From, Notify, Channel, Reader, ConnConfig], []).
|
||||
|
||||
-spec socket_ready(pid(), gen_tcp:socket()) -> ok.
|
||||
|
||||
|
|
@ -157,9 +175,10 @@ socket_ready(Pid, Socket) ->
|
|||
%% gen_fsm callbacks.
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
init([Channel, Reader, ConnConfig]) ->
|
||||
init([From, Notify, Channel, Reader, ConnConfig]) ->
|
||||
amqp10_client_frame_reader:register_session(Reader, self(), Channel),
|
||||
State = #state{channel = Channel, reader = Reader,
|
||||
State = #state{owner = From, channel = Channel, reader = Reader,
|
||||
notify = Notify,
|
||||
connection_config = ConnConfig},
|
||||
{ok, unmapped, State}.
|
||||
|
||||
|
|
@ -180,12 +199,15 @@ begin_sent(#'v1_0.begin'{remote_channel = {ushort, RemoteChannel},
|
|||
incoming_window = {uint, InWindow},
|
||||
outgoing_window = {uint, OutWindow}
|
||||
},
|
||||
#state{early_attach_requests = EARs} = State) ->
|
||||
#state{early_attach_requests = EARs} = State) ->
|
||||
|
||||
State1 = State#state{remote_channel = RemoteChannel},
|
||||
State2 = lists:foldr(fun({From, Attach}, S) ->
|
||||
send_attach(fun send/2, Attach, From, S)
|
||||
end, State1, EARs),
|
||||
|
||||
ok = notify_session_begin(State2),
|
||||
|
||||
{next_state, mapped, State2#state{early_attach_requests = [],
|
||||
next_incoming_id = NOI,
|
||||
remote_incoming_window = InWindow,
|
||||
|
|
@ -366,7 +388,7 @@ mapped({transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0,
|
|||
#{OutHandle := Link} ->
|
||||
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(NDI)},
|
||||
ok = send_transfer(Transfer, Parts, State),
|
||||
% TODO look into if erlang will correctly wrap integers durin
|
||||
% TODO look into if erlang will correctly wrap integers during
|
||||
% binary conversion.
|
||||
{reply, ok, mapped, book_transfer_send(Link, State)};
|
||||
_ ->
|
||||
|
|
@ -609,6 +631,10 @@ reverse_translate_delivery_state(modified) -> #'v1_0.modified'{};
|
|||
reverse_translate_delivery_state(released) -> #'v1_0.released'{};
|
||||
reverse_translate_delivery_state(received) -> #'v1_0.received'{}.
|
||||
|
||||
notify_session_begin(#state{owner = Owner, notify = true}) ->
|
||||
Owner ! {session_begin, self()},
|
||||
ok;
|
||||
notify_session_begin(_State) -> ok.
|
||||
|
||||
book_transfer_send(#link{output_handle = Handle} = Link,
|
||||
#state{next_outgoing_id = NOI,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,73 @@
|
|||
-module(mock_server).
|
||||
|
||||
%% API functions
|
||||
-export([start/1,
|
||||
set_steps/2,
|
||||
stop/1,
|
||||
run/1,
|
||||
amqp_step/1,
|
||||
send_amqp_header_step/1,
|
||||
recv_amqp_header_step/1
|
||||
]).
|
||||
|
||||
|
||||
-include("amqp10_client.hrl").
|
||||
|
||||
start(Port) ->
|
||||
{ok, LSock} = gen_tcp:listen(Port, [binary, {packet, 0}, {active, false}]),
|
||||
{LSock, spawn(?MODULE, run, [LSock])}.
|
||||
|
||||
set_steps({_Sock, Pid}, Steps) ->
|
||||
Pid ! {set_steps, Steps},
|
||||
ok.
|
||||
|
||||
stop({S, P}) ->
|
||||
P ! close,
|
||||
gen_tcp:close(S),
|
||||
exit(P, stop).
|
||||
|
||||
run(Listener) ->
|
||||
receive
|
||||
{set_steps, Steps} ->
|
||||
{ok, Sock} = gen_tcp:accept(Listener),
|
||||
lists:foreach(fun(S) -> S(Sock) end, Steps),
|
||||
receive
|
||||
close -> ok
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
send(Socket, Ch, Records) ->
|
||||
Encoded = [rabbit_amqp1_0_framing:encode_bin(R) || R <- Records],
|
||||
Frame = rabbit_amqp1_0_binary_generator:build_frame(Ch, Encoded),
|
||||
ok = gen_tcp:send(Socket, Frame).
|
||||
|
||||
recv(Sock) ->
|
||||
{ok, <<Length:32/unsigned, 2:8/unsigned,
|
||||
_/unsigned, Ch:16/unsigned>>} = gen_tcp:recv(Sock, 8),
|
||||
{ok, Data} = gen_tcp:recv(Sock, Length - 8),
|
||||
{PerfDesc, Payload} = rabbit_amqp1_0_binary_parser:parse(Data),
|
||||
Perf = rabbit_amqp1_0_framing:decode(PerfDesc),
|
||||
{Ch, Perf, Payload}.
|
||||
|
||||
amqp_step(Fun) ->
|
||||
fun (Sock) ->
|
||||
Recv = recv(Sock),
|
||||
ct:pal("AMQP Step receieved ~p~n", [Recv]),
|
||||
case Fun(Recv) of
|
||||
{_Ch, []} -> ok;
|
||||
{Ch, Records} ->
|
||||
ct:pal("AMQP Step send ~p~n", [Records]),
|
||||
send(Sock, Ch, Records)
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
send_amqp_header_step(Sock) ->
|
||||
ct:pal("Sending AMQP protocol header"),
|
||||
ok = gen_tcp:send(Sock, ?AMQP_PROTOCOL_HEADER).
|
||||
|
||||
recv_amqp_header_step(Sock) ->
|
||||
ct:pal("Receiving AMQP protocol header"),
|
||||
R = gen_tcp:recv(Sock, 8),
|
||||
ct:pal("handshake Step receieved ~p~n", [R]).
|
||||
|
|
@ -48,16 +48,21 @@ all() ->
|
|||
groups() ->
|
||||
[
|
||||
{rabbitmq, [], [
|
||||
open_close_connection,
|
||||
basic_roundtrip,
|
||||
split_transfer,
|
||||
transfer_unsettled
|
||||
]},
|
||||
{activemq, [], [
|
||||
open_close_connection,
|
||||
basic_roundtrip,
|
||||
split_transfer,
|
||||
transfer_unsettled,
|
||||
send_multiple
|
||||
]}
|
||||
]},
|
||||
{mock, [], [
|
||||
mock1
|
||||
]}
|
||||
].
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
|
|
@ -96,7 +101,11 @@ init_per_group(rabbitmq, Config) ->
|
|||
init_per_group(activemq, Config) ->
|
||||
rabbit_ct_helpers:run_steps(
|
||||
Config,
|
||||
activemq_ct_helpers:setup_steps()).
|
||||
activemq_ct_helpers:setup_steps());
|
||||
init_per_group(mock, Config) ->
|
||||
rabbit_ct_helpers:set_config(Config, [{mock_port, 21000},
|
||||
{mock_host, "localhost"}
|
||||
]).
|
||||
|
||||
end_per_group(rabbitmq, Config) ->
|
||||
rabbit_ct_helpers:run_steps(
|
||||
|
|
@ -105,20 +114,38 @@ end_per_group(rabbitmq, Config) ->
|
|||
end_per_group(activemq, Config) ->
|
||||
rabbit_ct_helpers:run_steps(
|
||||
Config,
|
||||
activemq_ct_helpers:teardown_steps()).
|
||||
activemq_ct_helpers:teardown_steps());
|
||||
end_per_group(mock, Config) ->
|
||||
Config.
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% Test cases.
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
Config.
|
||||
init_per_testcase(_Test, Config) ->
|
||||
case lists:keyfind(mock_port, 1, Config) of
|
||||
{_, Port} ->
|
||||
M = mock_server:start(Port),
|
||||
rabbit_ct_helpers:set_config(Config, {mock_server, M});
|
||||
_ -> Config
|
||||
end.
|
||||
|
||||
end_per_testcase(_, Config) ->
|
||||
Config.
|
||||
end_per_testcase(_Test, Config) ->
|
||||
case lists:keyfind(mock_server, 1, Config) of
|
||||
{_, M} -> mock_server:stop(M);
|
||||
_ -> Config
|
||||
end.
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
open_close_connection(Config) ->
|
||||
Hostname = ?config(rmq_hostname, Config),
|
||||
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
|
||||
{ok, Connection} = amqp10_client_connection:open(Hostname, Port),
|
||||
{ok, Session} = amqp10_client_session:'begin'(Connection),
|
||||
ok = amqp10_client_session:'end'(Session),
|
||||
ok = amqp10_client_connection:close(Connection).
|
||||
|
||||
basic_roundtrip(Config) ->
|
||||
Hostname = ?config(rmq_hostname, Config),
|
||||
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
|
||||
|
|
@ -189,3 +216,41 @@ send_multiple(Config) ->
|
|||
|
||||
ok = amqp10_client_session:'end'(Session),
|
||||
ok = amqp10_client_connection:close(Connection).
|
||||
|
||||
|
||||
mock1(Config) ->
|
||||
Hostname = ?config(mock_host, Config),
|
||||
Port = ?config(mock_port, Config),
|
||||
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
|
||||
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]}
|
||||
end,
|
||||
BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) ->
|
||||
{Ch, [#'v1_0.begin'{remote_channel = {ushort, 1},
|
||||
next_outgoing_id = {uint, 1},
|
||||
incoming_window = {uint, 1000},
|
||||
outgoing_window = {uint, 1000}}
|
||||
]}
|
||||
end,
|
||||
AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false, name = Name}, _Pay}) ->
|
||||
{Ch, [#'v1_0.attach'{name = Name,
|
||||
handle = {uint, 99},
|
||||
role = true}]}
|
||||
end,
|
||||
Steps = [
|
||||
fun mock_server:recv_amqp_header_step/1,
|
||||
fun mock_server:send_amqp_header_step/1,
|
||||
mock_server:amqp_step(OpenStep),
|
||||
mock_server:amqp_step(BeginStep),
|
||||
mock_server:amqp_step(AttachStep)
|
||||
],
|
||||
|
||||
ok = mock_server:set_steps(?config(mock_server, Config), Steps),
|
||||
|
||||
Cfg = #{address => Hostname, port => Port, sasl => none},
|
||||
{ok, Connection} = amqp10_client_connection:open(Cfg),
|
||||
{ok, Session} = amqp10_client_session:begin_sync(Connection),
|
||||
{ok, _Sender} = amqp10_client_link:sender(Session, <<"mock1-sender">>,
|
||||
<<"test">>),
|
||||
ok = amqp10_client_session:'end'(Session),
|
||||
ok = amqp10_client_connection:close(Connection),
|
||||
ok.
|
||||
|
|
|
|||
Loading…
Reference in New Issue