Merge pull request #49 from AntoineGagne/feature/migrate-to-gen-statem-frame-reader

Switch frame reader to gen_statem
This commit is contained in:
Michael Klishin 2020-05-10 11:24:22 +03:00 committed by GitHub
commit 5291bdc1e5
2 changed files with 68 additions and 71 deletions

View File

@ -18,11 +18,30 @@
-behaviour(application).
%% Application callbacks
-export([start/2,
stop/1]).
-type start_type() :: (
normal |
{takeover, Node :: node()} |
{failover, Node :: node()}
).
-type state() :: term().
%%====================================================================
%% API
%%====================================================================
-spec start(StartType :: start_type(), StartArgs :: term()) ->
{ok, Pid :: pid()} | {ok, Pid :: pid(), State :: state()} | {error, Reason :: term()}.
start(_Type, _Args) ->
amqp10_client_sup:start_link().
-spec stop(State :: state()) -> ok.
stop(_State) ->
ok.
%%====================================================================
%% Internal functions
%%====================================================================

View File

@ -15,7 +15,7 @@
%%
-module(amqp10_client_frame_reader).
-behaviour(gen_fsm).
-behaviour(gen_statem).
-include("amqp10_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
@ -24,30 +24,19 @@
-include_lib("eunit/include/eunit.hrl").
-endif.
-ifdef(nowarn_deprecated_gen_fsm).
-compile({nowarn_deprecated_function,
[{gen_fsm, reply, 2},
{gen_fsm, send_all_state_event, 2},
{gen_fsm, send_event, 2},
{gen_fsm, start_link, 3},
{gen_fsm, sync_send_all_state_event, 2}]}).
-endif.
%% Private API.
%% API
-export([start_link/2,
set_connection/2,
close/1,
register_session/3,
unregister_session/4]).
%% gen_fsm callbacks.
%% gen_statem callbacks
-export([init/1,
expecting_connection_pid/2,
handle_event/3,
handle_sync_event/4,
handle_info/3,
terminate/3,
code_change/4]).
callback_mode/0,
handle_event/4,
code_change/4,
terminate/3]).
-define(RABBIT_TCP_OPTS, [binary,
{packet, 0},
@ -73,17 +62,14 @@
outgoing_channels = #{},
incoming_channels = #{}}).
%% -------------------------------------------------------------------
%% Private API.
%% -------------------------------------------------------------------
%% @private
%%%===================================================================
%%% API
%%%===================================================================
-spec start_link(pid(), amqp10_client_connection:connection_config()) ->
{ok, pid()} | ignore | {error, any()}.
start_link(Sup, Config) ->
gen_fsm:start_link(?MODULE, [Sup, Config], []).
gen_statem:start_link(?MODULE, [Sup, Config], []).
%% @private
%% @doc
@ -91,26 +77,25 @@ start_link(Sup, Config) ->
%%
%% This function is called when a connection supervision tree is
%% started.
-spec set_connection(pid(), pid()) -> ok.
-spec set_connection(Reader :: pid(), ConnectionPid :: pid()) -> ok.
set_connection(Reader, Connection) ->
gen_fsm:send_event(Reader, {set_connection, Connection}).
gen_statem:cast(Reader, {set_connection, Connection}).
close(Reader) ->
gen_fsm:send_all_state_event(Reader, close).
gen_statem:cast(Reader, close).
register_session(Reader, Session, OutgoingChannel) ->
gen_fsm:send_all_state_event(
Reader, {register_session, Session, OutgoingChannel}).
gen_statem:cast(Reader, {register_session, Session, OutgoingChannel}).
unregister_session(Reader, Session, OutgoingChannel, IncomingChannel) ->
gen_fsm:send_all_state_event(
Reader, {unregister_session, Session, OutgoingChannel, IncomingChannel}).
gen_statem:cast(Reader, {unregister_session, Session, OutgoingChannel, IncomingChannel}).
%% -------------------------------------------------------------------
%% gen_fsm callbacks.
%% -------------------------------------------------------------------
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
callback_mode() ->
[handle_event_function].
init([Sup, ConnConfig]) when is_map(ConnConfig) ->
Port = maps:get(port, ConnConfig, 5672),
@ -141,41 +126,34 @@ init([Sup, ConnConfig]) when is_map(ConnConfig) ->
{stop, Reason}
end.
expecting_connection_pid({set_connection, ConnectionPid},
#state{socket = Socket} = State) ->
handle_event(cast, {set_connection, ConnectionPid}, expecting_connection_pid,
State=#state{socket = Socket}) ->
ok = amqp10_client_connection:socket_ready(ConnectionPid, Socket),
set_active_once(State),
State1 = State#state{connection = ConnectionPid},
{next_state, expecting_frame_header, State1}.
handle_event({register_session, Session, OutgoingChannel},
StateName,
#state{socket = Socket,
outgoing_channels = OutgoingChannels} = State) ->
{next_state, expecting_frame_header, State1};
handle_event(cast, {register_session, Session, OutgoingChannel}, _StateName,
#state{socket = Socket, outgoing_channels = OutgoingChannels} = State) ->
ok = amqp10_client_session:socket_ready(Session, Socket),
OutgoingChannels1 = OutgoingChannels#{OutgoingChannel => Session},
State1 = State#state{outgoing_channels = OutgoingChannels1},
{next_state, StateName, State1};
handle_event({unregister_session, _Session, OutgoingChannel, IncomingChannel},
StateName,
#state{outgoing_channels = OutgoingChannels,
incoming_channels = IncomingChannels} = State) ->
{keep_state, State1};
handle_event(cast, {unregister_session, _Session, OutgoingChannel, IncomingChannel}, _StateName,
State=#state{outgoing_channels = OutgoingChannels,
incoming_channels = IncomingChannels}) ->
OutgoingChannels1 = maps:remove(OutgoingChannel, OutgoingChannels),
IncomingChannels1 = maps:remove(IncomingChannel, IncomingChannels),
State1 = State#state{outgoing_channels = OutgoingChannels1,
incoming_channels = IncomingChannels1},
{next_state, StateName, State1};
handle_event(close, _StateName, State = #state{socket = Socket}) ->
{keep_state, State1};
handle_event(cast, close, _StateName, State = #state{socket = Socket}) ->
close_socket(Socket),
{stop, normal, State#state{socket = undefined}};
handle_event(_Event, StateName, State) ->
{next_state, StateName, State}.
handle_sync_event(_Event, _From, StateName, State) ->
Reply = ok,
{reply, Reply, StateName, State}.
handle_event({call, From}, _Action, _State, _Data) ->
{keep_state_and_data, [{reply, From, ok}]};
handle_info({Tcp, _, Packet}, StateName, #state{buffer = Buffer} = State)
handle_event(info, {Tcp, _, Packet}, StateName, #state{buffer = Buffer} = State)
when Tcp == tcp orelse Tcp == ssl ->
Data = <<Buffer/binary, Packet/binary>>,
case handle_input(StateName, Data, State) of
@ -187,7 +165,7 @@ handle_info({Tcp, _, Packet}, StateName, #state{buffer = Buffer} = State)
{stop, Reason, NewState}
end;
handle_info({TcpError, _, Reason}, StateName, State)
handle_event(info, {TcpError, _, Reason}, StateName, State)
when TcpError == tcp_error orelse TcpError == ssl_error ->
error_logger:warning_msg("AMQP 1.0 connection socket errored, connection state: '~s', reason: '~p'~n",
[StateName, Reason]),
@ -195,7 +173,7 @@ handle_info({TcpError, _, Reason}, StateName, State)
buffer = <<>>,
frame_state = undefined},
{stop, {error, Reason}, State1};
handle_info({TcpClosed, _}, StateName, State)
handle_event(info, {TcpClosed, _}, StateName, State)
when TcpClosed == tcp_closed orelse TcpClosed == ssl_closed ->
error_logger:warning_msg("AMQP 1.0 connection socket was closed, connection state: '~s'~n",
[StateName]),
@ -204,29 +182,29 @@ handle_info({TcpClosed, _}, StateName, State)
frame_state = undefined},
{stop, normal, State1};
handle_info(heartbeat, StateName, State = #state{connection = Conn}) ->
amqp10_client_connection:close(Conn, {resource_limit_exceeded,
<<"remote idle-time-out">>}),
handle_event(info, heartbeat, _StateName, #state{connection = Connection}) ->
amqp10_client_connection:close(Connection,
{resource_limit_exceeded, <<"remote idle-time-out">>}),
% do not stop as may want to read the peer's close frame
{next_state, StateName, State}.
keep_state_and_data.
terminate(normal, _StateName, #state{connection_sup = _Sup, socket = Socket}) ->
maybe_close_socket(Socket);
terminate(_Reason, _StateName, #state{connection_sup = _Sup, socket = Socket}) ->
maybe_close_socket(Socket).
code_change(_Vsn, State, Data, _Extra) ->
{ok, State, Data}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
maybe_close_socket(undefined) ->
ok;
maybe_close_socket(Socket) ->
close_socket(Socket).
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
%% -------------------------------------------------------------------
%% Internal functions.
%% -------------------------------------------------------------------
close_socket({tcp, Socket}) ->
gen_tcp:close(Socket);
close_socket({ssl, Socket}) ->