Convert rabbit_stream_reader into state machine
This is pure refactoring - no functional change. Benefits: * code is more maintainable * smaller methods (instead of previous 350 lines listen_loop_post_auth function) * well defined state transitions (e.g. useful to enforce authentication protocol) * we get some gen_statem helper functions for free (e.g. debug utilities) Useful doc: https://ninenines.eu/docs/en/ranch/2.0/guide/protocols/
This commit is contained in:
parent
ff174eaa5f
commit
81ee05f9ce
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(rabbit_stream_reader).
|
||||
|
||||
-behaviour(gen_statem).
|
||||
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
|
||||
|
||||
|
|
@ -86,6 +88,12 @@
|
|||
frame_max :: integer(),
|
||||
heartbeat :: integer()}).
|
||||
|
||||
-record(statem_data,
|
||||
{transport :: module(),
|
||||
connection :: #stream_connection{},
|
||||
connection_state :: #stream_connection_state{},
|
||||
config :: #configuration{}}).
|
||||
|
||||
-define(CREATION_EVENT_KEYS,
|
||||
[pid,
|
||||
name,
|
||||
|
|
@ -136,20 +144,33 @@
|
|||
peer_cert_subject,
|
||||
peer_cert_validity]).
|
||||
|
||||
%% API
|
||||
%% client API
|
||||
-export([start_link/4,
|
||||
init/1,
|
||||
info/2,
|
||||
consumers_info/2,
|
||||
publishers_info/2,
|
||||
in_vhost/2]).
|
||||
|
||||
-export([resource_alarm/3]).
|
||||
|
||||
start_link(KeepaliveSup, Transport, Ref, Opts) ->
|
||||
Pid = proc_lib:spawn_link(?MODULE, init,
|
||||
[[KeepaliveSup, Transport, Ref, Opts]]),
|
||||
%% gen_statem callbacks
|
||||
-export([callback_mode/0,
|
||||
%% not called by gen_statem since we gen_statem:enter_loop/4
|
||||
init/1,
|
||||
%% states
|
||||
tcp_connected/3,
|
||||
peer_properties_exchanged/3,
|
||||
authenticating/3,
|
||||
tuning/3,
|
||||
tuned/3,
|
||||
open/3,
|
||||
close_sent/3]).
|
||||
|
||||
{ok, Pid}.
|
||||
callback_mode() ->
|
||||
state_functions.
|
||||
|
||||
start_link(KeepaliveSup, Transport, Ref, Opts) ->
|
||||
{ok, proc_lib:spawn_link(?MODULE, init, [[KeepaliveSup, Transport, Ref, Opts]])}.
|
||||
|
||||
init([KeepaliveSup,
|
||||
Transport,
|
||||
|
|
@ -202,25 +223,191 @@ init([KeepaliveSup,
|
|||
data =
|
||||
rabbit_stream_core:init(undefined)},
|
||||
Transport:setopts(RealSocket, [{active, once}]),
|
||||
|
||||
rabbit_alarm:register(self(), {?MODULE, resource_alarm, []}),
|
||||
|
||||
listen_loop_pre_auth(Transport,
|
||||
Connection,
|
||||
State,
|
||||
#configuration{initial_credits =
|
||||
InitialCredits,
|
||||
credits_required_for_unblocking
|
||||
=
|
||||
CreditsRequiredBeforeUnblocking,
|
||||
% gen_statem process has its start_link call not return until the init function returns.
|
||||
% This is problematic, because we won't be able to call ranch:handshake/2
|
||||
% from the init callback as this would cause a deadlock to happen.
|
||||
% Therefore, we use the gen_statem:enter_loop/4 function.
|
||||
% See https://ninenines.eu/docs/en/ranch/2.0/guide/protocols/
|
||||
gen_statem:enter_loop(?MODULE, [], tcp_connected,
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = Connection,
|
||||
connection_state = State,
|
||||
config = #configuration{
|
||||
initial_credits = InitialCredits,
|
||||
credits_required_for_unblocking = CreditsRequiredBeforeUnblocking,
|
||||
frame_max = FrameMax,
|
||||
heartbeat = Heartbeat});
|
||||
heartbeat = Heartbeat}});
|
||||
{Error, Reason} ->
|
||||
rabbit_net:fast_close(RealSocket),
|
||||
rabbit_log_connection:warning("Closing connection because of ~p ~p",
|
||||
[Error, Reason])
|
||||
end.
|
||||
|
||||
tcp_connected(info, Msg, StateData) ->
|
||||
handle_info(Msg, StateData,
|
||||
fun (NextConnectionStep,
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S}
|
||||
} = StatemData,
|
||||
NewConnection,
|
||||
NewConnectionState) ->
|
||||
if NextConnectionStep =:= peer_properties_exchanged ->
|
||||
{next_state, peer_properties_exchanged, StatemData#statem_data{
|
||||
connection = NewConnection,
|
||||
connection_state = NewConnectionState
|
||||
}};
|
||||
true ->
|
||||
invalid_transition(Transport, S, ?FUNCTION_NAME, NextConnectionStep)
|
||||
end
|
||||
end).
|
||||
|
||||
peer_properties_exchanged(info, Msg, StateData) ->
|
||||
handle_info(Msg, StateData,
|
||||
fun (NextConnectionStep,
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S}
|
||||
} = StatemData,
|
||||
NewConnection,
|
||||
NewConnectionState) ->
|
||||
if NextConnectionStep =:= authenticating ->
|
||||
{next_state, authenticating, StatemData#statem_data{
|
||||
connection = NewConnection,
|
||||
connection_state = NewConnectionState
|
||||
}};
|
||||
true ->
|
||||
invalid_transition(Transport, S, ?FUNCTION_NAME, NextConnectionStep)
|
||||
end
|
||||
end).
|
||||
|
||||
authenticating(info, Msg, StateData) ->
|
||||
handle_info(Msg, StateData,
|
||||
fun(NextConnectionStep,
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S},
|
||||
config = #configuration{
|
||||
frame_max = FrameMax,
|
||||
heartbeat = Heartbeat}
|
||||
} = StatemData,
|
||||
NewConnection,
|
||||
NewConnectionState) ->
|
||||
if NextConnectionStep =:= authenticated ->
|
||||
Frame = rabbit_stream_core:frame({tune, FrameMax, Heartbeat}),
|
||||
send(Transport, S, Frame),
|
||||
{next_state, tuning, StatemData#statem_data{
|
||||
connection = NewConnection#stream_connection{connection_step
|
||||
=
|
||||
tuning},
|
||||
connection_state = NewConnectionState
|
||||
}};
|
||||
true ->
|
||||
invalid_transition(Transport, S, ?FUNCTION_NAME, NextConnectionStep)
|
||||
end
|
||||
end).
|
||||
|
||||
tuning(info, Msg, StateData) ->
|
||||
handle_info(Msg, StateData,
|
||||
fun (NextConnectionStep,
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S},
|
||||
config = Configuration
|
||||
} = StatemData,
|
||||
NewConnection,
|
||||
NewConnectionState) ->
|
||||
case NextConnectionStep of
|
||||
tuned ->
|
||||
{next_state, tuned, StatemData#statem_data{
|
||||
connection = NewConnection,
|
||||
connection_state = NewConnectionState
|
||||
}};
|
||||
opened ->
|
||||
transition_to_opened(Transport, Configuration, NewConnection, NewConnectionState);
|
||||
_ ->
|
||||
invalid_transition(Transport, S, ?FUNCTION_NAME, NextConnectionStep)
|
||||
end
|
||||
end).
|
||||
|
||||
tuned(info, Msg, StateData) ->
|
||||
handle_info(Msg, StateData,
|
||||
fun (NextConnectionStep,
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S},
|
||||
config = Configuration},
|
||||
NewConnection, NewConnectionState) ->
|
||||
if NextConnectionStep =:= opened ->
|
||||
transition_to_opened(Transport, Configuration, NewConnection, NewConnectionState);
|
||||
true ->
|
||||
invalid_transition(Transport, S, ?FUNCTION_NAME, NextConnectionStep)
|
||||
end
|
||||
end).
|
||||
|
||||
handle_info(Msg, #statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S, connection_step = PreviousConnectionStep} = Connection,
|
||||
connection_state = State
|
||||
} = StatemData, Transition) ->
|
||||
{OK, Closed, Error, _Passive} = Transport:messages(),
|
||||
case Msg of
|
||||
{OK, S, Data} ->
|
||||
{Connection1, State1} = handle_inbound_data_pre_auth(Transport, Connection, State, Data),
|
||||
Transport:setopts(S, [{active, once}]),
|
||||
#stream_connection{connection_step = NewConnectionStep} = Connection1,
|
||||
rabbit_log_connection:debug("Transitioned from ~s to ~s", [PreviousConnectionStep, NewConnectionStep]),
|
||||
Transition(NewConnectionStep, StatemData, Connection1, State1);
|
||||
{Closed, S} ->
|
||||
rabbit_log_connection:info("Socket ~w closed", [S]),
|
||||
stop;
|
||||
{Error, S, Reason} ->
|
||||
rabbit_log_connection:warning("Socket error ~p [~w]", [Reason, S]),
|
||||
stop;
|
||||
{resource_alarm, IsThereAlarm} ->
|
||||
{keep_state, StatemData#statem_data{
|
||||
connection = Connection#stream_connection{
|
||||
resource_alarm = IsThereAlarm},
|
||||
connection_state = State#stream_connection_state{
|
||||
blocked = true}}};
|
||||
Unknown ->
|
||||
rabbit_log:warning("Received unknown message ~p", [Unknown]),
|
||||
close_immediately(Transport, S),
|
||||
stop
|
||||
end.
|
||||
|
||||
transition_to_opened(Transport, Configuration, NewConnection, NewConnectionState) ->
|
||||
% TODO remove registration to rabbit_stream_connections
|
||||
% just meant to be able to close the connection remotely
|
||||
% should be possible once the connections are available in ctl list_connections
|
||||
pg_local:join(rabbit_stream_connections, self()),
|
||||
Connection1 =
|
||||
rabbit_event:init_stats_timer(NewConnection,
|
||||
#stream_connection.stats_timer),
|
||||
Connection2 = ensure_stats_timer(Connection1),
|
||||
Infos =
|
||||
augment_infos_with_user_provided_connection_name(infos(?CREATION_EVENT_KEYS,
|
||||
Connection2,
|
||||
NewConnectionState),
|
||||
Connection2),
|
||||
rabbit_core_metrics:connection_created(self(), Infos),
|
||||
rabbit_event:notify(connection_created, Infos),
|
||||
rabbit_networking:register_non_amqp_connection(self()),
|
||||
{next_state, open, #statem_data{
|
||||
transport = Transport,
|
||||
connection = Connection2,
|
||||
connection_state = NewConnectionState,
|
||||
config = Configuration
|
||||
}}.
|
||||
|
||||
invalid_transition(Transport, Socket, From, To) ->
|
||||
rabbit_log_connection:warning("Closing socket ~w. Invalid transition from ~s to ~s.",
|
||||
[Socket, From, To]),
|
||||
close_immediately(Transport, Socket),
|
||||
stop.
|
||||
|
||||
resource_alarm(ConnectionPid, disk,
|
||||
{_WasAlarmSetForNode,
|
||||
IsThereAnyAlarmsForSameResourceInTheCluster, _Node}) ->
|
||||
|
|
@ -311,101 +498,6 @@ messages_errored(Counters) ->
|
|||
stream_committed_offset(Log) ->
|
||||
osiris_log:committed_offset(Log).
|
||||
|
||||
listen_loop_pre_auth(Transport,
|
||||
#stream_connection{socket = S} = Connection,
|
||||
State,
|
||||
#configuration{frame_max = FrameMax,
|
||||
heartbeat = Heartbeat} =
|
||||
Configuration) ->
|
||||
{OK, Closed, Error, _Passive} = Transport:messages(),
|
||||
%% FIXME introduce timeout to complete the connection opening (after block should be enough)
|
||||
receive
|
||||
{resource_alarm, IsThereAlarm} ->
|
||||
listen_loop_pre_auth(Transport,
|
||||
Connection#stream_connection{resource_alarm =
|
||||
IsThereAlarm},
|
||||
State#stream_connection_state{blocked = true},
|
||||
Configuration);
|
||||
{OK, S, Data} ->
|
||||
#stream_connection{connection_step = ConnectionStep0} = Connection,
|
||||
{Connection1, State1} =
|
||||
handle_inbound_data_pre_auth(Transport,
|
||||
Connection,
|
||||
State,
|
||||
Data),
|
||||
Transport:setopts(S, [{active, once}]),
|
||||
#stream_connection{connection_step = ConnectionStep} = Connection1,
|
||||
rabbit_log:info("Transitioned from ~p to ~p",
|
||||
[ConnectionStep0, ConnectionStep]),
|
||||
case ConnectionStep of
|
||||
peer_properties_exchanged when ConnectionStep0 =:= tcp_connected ->
|
||||
listen_loop_pre_auth(Transport,
|
||||
Connection1,
|
||||
State1,
|
||||
Configuration);
|
||||
authenticating when ConnectionStep0 =:= peer_properties_exchanged ->
|
||||
listen_loop_pre_auth(Transport,
|
||||
Connection1,
|
||||
State1,
|
||||
Configuration);
|
||||
tuned when ConnectionStep0 =:= tuning ->
|
||||
listen_loop_pre_auth(Transport,
|
||||
Connection1,
|
||||
State1,
|
||||
Configuration);
|
||||
authenticated ->
|
||||
Frame =
|
||||
rabbit_stream_core:frame({tune, FrameMax, Heartbeat}),
|
||||
send(Transport, S, Frame),
|
||||
listen_loop_pre_auth(Transport,
|
||||
Connection1#stream_connection{connection_step
|
||||
=
|
||||
tuning},
|
||||
State1,
|
||||
Configuration);
|
||||
opened ->
|
||||
% TODO remove registration to rabbit_stream_connections
|
||||
% just meant to be able to close the connection remotely
|
||||
% should be possible once the connections are available in ctl list_connections
|
||||
pg_local:join(rabbit_stream_connections, self()),
|
||||
Connection2 =
|
||||
rabbit_event:init_stats_timer(Connection1,
|
||||
#stream_connection.stats_timer),
|
||||
Connection3 = ensure_stats_timer(Connection2),
|
||||
Infos =
|
||||
augment_infos_with_user_provided_connection_name(infos(?CREATION_EVENT_KEYS,
|
||||
Connection3,
|
||||
State1),
|
||||
Connection3),
|
||||
rabbit_core_metrics:connection_created(self(), Infos),
|
||||
rabbit_event:notify(connection_created, Infos),
|
||||
rabbit_networking:register_non_amqp_connection(self()),
|
||||
listen_loop_post_auth(Transport,
|
||||
Connection3,
|
||||
State1,
|
||||
Configuration);
|
||||
_ ->
|
||||
rabbit_log_connection:warning("Closing socket ~w because transition from ~p to ~p is not allowed",
|
||||
[S, ConnectionStep0, ConnectionStep]),
|
||||
case Connection1#stream_connection.authentication_state of
|
||||
done ->
|
||||
close(Transport, S, State);
|
||||
_ ->
|
||||
% for unauthenticated clients, shut down socket immediately
|
||||
Transport:shutdown(S, read),
|
||||
Transport:close(S)
|
||||
end
|
||||
end;
|
||||
{Closed, S} ->
|
||||
rabbit_log_connection:info("Socket ~w closed", [S]),
|
||||
ok;
|
||||
{Error, S, Reason} ->
|
||||
rabbit_log_connection:info("Socket error ~p [~w]", [Reason, S]);
|
||||
M ->
|
||||
rabbit_log:warning("Unknown message ~p", [M]),
|
||||
close(Transport, S, State)
|
||||
end.
|
||||
|
||||
augment_infos_with_user_provided_connection_name(Infos,
|
||||
#stream_connection{client_properties
|
||||
=
|
||||
|
|
@ -424,29 +516,26 @@ close(Transport, S,
|
|||
Transport:shutdown(S, write),
|
||||
Transport:close(S).
|
||||
|
||||
listen_loop_post_auth(Transport,
|
||||
#stream_connection{socket = S,
|
||||
% Do not read or write any further data from / to Socket.
|
||||
% Useful to close sockets for unauthenticated clients.
|
||||
close_immediately(Transport, S) ->
|
||||
Transport:shutdown(S, read),
|
||||
Transport:close(S).
|
||||
|
||||
open(info,
|
||||
{resource_alarm, IsThereAlarm},
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{
|
||||
socket = S,
|
||||
name = ConnectionName,
|
||||
stream_subscriptions =
|
||||
StreamSubscriptions,
|
||||
credits = Credits,
|
||||
heartbeater = Heartbeater,
|
||||
monitors = Monitors,
|
||||
client_properties = ClientProperties,
|
||||
publishers = Publishers,
|
||||
publisher_to_ids = PublisherRefToIds,
|
||||
send_file_oct = SendFileOct} =
|
||||
Connection0,
|
||||
#stream_connection_state{consumers = Consumers,
|
||||
blocked = Blocked} =
|
||||
State,
|
||||
#configuration{credits_required_for_unblocking =
|
||||
CreditsRequiredForUnblocking} =
|
||||
Configuration) ->
|
||||
heartbeater = Heartbeater
|
||||
} = Connection0,
|
||||
connection_state = #stream_connection_state{blocked = Blocked} = State,
|
||||
config = #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking}
|
||||
} = StatemData) ->
|
||||
Connection = ensure_stats_timer(Connection0),
|
||||
{OK, Closed, Error, _Passive} = Transport:messages(),
|
||||
receive
|
||||
{resource_alarm, IsThereAlarm} ->
|
||||
rabbit_log_connection:debug("Connection ~p received resource alarm. Alarm "
|
||||
"on? ~p",
|
||||
[ConnectionName, IsThereAlarm]),
|
||||
|
|
@ -476,15 +565,25 @@ listen_loop_post_auth(Transport,
|
|||
_ ->
|
||||
ok
|
||||
end,
|
||||
listen_loop_post_auth(Transport,
|
||||
Connection#stream_connection{resource_alarm =
|
||||
IsThereAlarm},
|
||||
State#stream_connection_state{blocked =
|
||||
NewBlockedState},
|
||||
Configuration);
|
||||
{OK, S, Data} ->
|
||||
{Connection1, State1} =
|
||||
handle_inbound_data_post_auth(Transport,
|
||||
{keep_state,
|
||||
StatemData#statem_data{
|
||||
connection = Connection#stream_connection{resource_alarm = IsThereAlarm},
|
||||
connection_state = State#stream_connection_state{blocked = NewBlockedState}
|
||||
}};
|
||||
open(info,
|
||||
{OK, S, Data},
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S,
|
||||
credits = Credits,
|
||||
heartbeater = Heartbeater
|
||||
} = Connection0,
|
||||
connection_state = #stream_connection_state{blocked = Blocked} = State,
|
||||
config = Configuration
|
||||
} = StatemData)
|
||||
when OK =:= tcp; OK =:= ssl ->
|
||||
Connection = ensure_stats_timer(Connection0),
|
||||
{Connection1, State1} = handle_inbound_data_post_auth(Transport,
|
||||
Connection,
|
||||
State,
|
||||
Data),
|
||||
|
|
@ -493,14 +592,15 @@ listen_loop_post_auth(Transport,
|
|||
closing ->
|
||||
close(Transport, S, State),
|
||||
rabbit_networking:unregister_non_amqp_connection(self()),
|
||||
notify_connection_closed(Connection1, State1);
|
||||
notify_connection_closed(Connection1, State1),
|
||||
stop;
|
||||
close_sent ->
|
||||
rabbit_log_connection:debug("Transitioned to close_sent"),
|
||||
Transport:setopts(S, [{active, once}]),
|
||||
listen_loop_post_close(Transport,
|
||||
Connection1,
|
||||
State1,
|
||||
Configuration);
|
||||
{next_state, close_sent, StatemData#statem_data{
|
||||
connection = Connection1,
|
||||
connection_state = State1
|
||||
}};
|
||||
_ ->
|
||||
State2 =
|
||||
case Blocked of
|
||||
|
|
@ -528,12 +628,40 @@ listen_loop_post_auth(Transport,
|
|||
true}
|
||||
end
|
||||
end,
|
||||
listen_loop_post_auth(Transport,
|
||||
Connection1,
|
||||
State2,
|
||||
Configuration)
|
||||
{keep_state, StatemData#statem_data{
|
||||
connection = Connection1,
|
||||
connection_state = State2}}
|
||||
end;
|
||||
{'DOWN', MonitorRef, process, _OsirisPid, _Reason} ->
|
||||
open(info, {Closed, Socket}, #statem_data{
|
||||
connection = Connection,
|
||||
connection_state = State
|
||||
})
|
||||
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
|
||||
demonitor_all_streams(Connection),
|
||||
rabbit_networking:unregister_non_amqp_connection(self()),
|
||||
notify_connection_closed(Connection, State),
|
||||
rabbit_log_connection:warning("Socket ~w closed [~w]", [Socket, self()]),
|
||||
stop;
|
||||
open(info, {Error, Socket, Reason}, #statem_data{
|
||||
connection = Connection,
|
||||
connection_state = State
|
||||
})
|
||||
when Error =:= tcp_error; Error =:= ssl_error ->
|
||||
demonitor_all_streams(Connection),
|
||||
rabbit_networking:unregister_non_amqp_connection(self()),
|
||||
notify_connection_closed(Connection, State),
|
||||
rabbit_log_connection:error("Socket error ~p [~w] [~w]", [Reason, Socket, self()]),
|
||||
stop;
|
||||
open(info,
|
||||
{'DOWN', MonitorRef, process, _OsirisPid, _Reason},
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S,
|
||||
monitors = Monitors
|
||||
} = Connection0,
|
||||
connection_state = State
|
||||
} = StatemData) ->
|
||||
Connection = ensure_stats_timer(Connection0),
|
||||
{Connection1, State1} =
|
||||
case Monitors of
|
||||
#{MonitorRef := Stream} ->
|
||||
|
|
@ -557,12 +685,88 @@ listen_loop_post_auth(Transport,
|
|||
_ ->
|
||||
{Connection, State}
|
||||
end,
|
||||
listen_loop_post_auth(Transport,
|
||||
Connection1,
|
||||
State1,
|
||||
Configuration);
|
||||
{'$gen_cast',
|
||||
{queue_event, _, {osiris_written, _, undefined, CorrelationList}}} ->
|
||||
{keep_state, StatemData#statem_data{
|
||||
connection = Connection1,
|
||||
connection_state = State1}};
|
||||
open(info, heartbeat_send, #statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S} = Connection,
|
||||
connection_state = State }) ->
|
||||
Frame = rabbit_stream_core:frame(heartbeat),
|
||||
case catch send(Transport, S, Frame) of
|
||||
ok ->
|
||||
keep_state_and_data;
|
||||
Unexpected ->
|
||||
rabbit_log_connection:info("Heartbeat send error ~p, closing connection",
|
||||
[Unexpected]),
|
||||
C1 = demonitor_all_streams(Connection),
|
||||
close(Transport, C1, State),
|
||||
stop
|
||||
end;
|
||||
open(info, heartbeat_timeout, #statem_data{
|
||||
transport = Transport,
|
||||
connection = Connection,
|
||||
connection_state = State }) ->
|
||||
rabbit_log_connection:debug("Heartbeat timeout, closing connection"),
|
||||
C1 = demonitor_all_streams(Connection),
|
||||
close(Transport, C1, State),
|
||||
stop;
|
||||
open(info, {infos, From}, #statem_data{
|
||||
connection = #stream_connection{
|
||||
client_properties = ClientProperties
|
||||
}}) ->
|
||||
From ! {self(), ClientProperties},
|
||||
keep_state_and_data;
|
||||
open(info, emit_stats, #statem_data{
|
||||
connection = Connection,
|
||||
connection_state = State} = StatemData) ->
|
||||
Connection1 = emit_stats(Connection, State),
|
||||
{keep_state, StatemData#statem_data{connection = Connection1}};
|
||||
open(info, Unknown, _StatemData) ->
|
||||
rabbit_log_connection:warning("Received unknown message ~p in state ~s",
|
||||
[Unknown, ?FUNCTION_NAME]),
|
||||
%% FIXME send close
|
||||
keep_state_and_data;
|
||||
open({call, From}, info, #statem_data{
|
||||
connection = Connection,
|
||||
connection_state = State}) ->
|
||||
{keep_state_and_data, {reply, From, infos(?INFO_ITEMS, Connection, State)}};
|
||||
open({call, From}, {info, Items}, #statem_data{
|
||||
connection = Connection,
|
||||
connection_state = State}) ->
|
||||
{keep_state_and_data, {reply, From, infos(Items, Connection, State)}};
|
||||
open({call, From}, {consumers_info, Items}, #statem_data{connection_state = State}) ->
|
||||
{keep_state_and_data, {reply, From, consumers_infos(Items, State)}};
|
||||
open({call, From}, {publishers_info, Items}, #statem_data{connection = Connection}) ->
|
||||
{keep_state_and_data, {reply, From, publishers_infos(Items, Connection)}};
|
||||
open({call, From}, {shutdown, Explanation}, #statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S}
|
||||
= Connection,
|
||||
connection_state = State}) ->
|
||||
% likely closing call from the management plugin
|
||||
rabbit_log_connection:info("Forcing stream connection ~p closing: ~p",
|
||||
[self(), Explanation]),
|
||||
demonitor_all_streams(Connection),
|
||||
rabbit_networking:unregister_non_amqp_connection(self()),
|
||||
notify_connection_closed(Connection, State),
|
||||
close(Transport, S, State),
|
||||
{stop_and_reply, normal, {reply, From, ok}};
|
||||
open(cast,
|
||||
{queue_event, _, {osiris_written, _, undefined, CorrelationList}},
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S,
|
||||
credits = Credits,
|
||||
heartbeater = Heartbeater,
|
||||
publishers = Publishers
|
||||
} = Connection0,
|
||||
connection_state = #stream_connection_state{
|
||||
blocked = Blocked
|
||||
} = State,
|
||||
config = Configuration
|
||||
} = StatemData) ->
|
||||
Connection = ensure_stats_timer(Connection0),
|
||||
ByPublisher =
|
||||
lists:foldr(fun({PublisherId, PublishingId}, Acc) ->
|
||||
case maps:get(PublisherId, Acc, undefined) of
|
||||
|
|
@ -585,11 +789,9 @@ listen_loop_post_auth(Transport,
|
|||
length(PublishingIds))
|
||||
end,
|
||||
ByPublisher),
|
||||
|
||||
CorrelationIdCount = length(CorrelationList),
|
||||
add_credits(Credits, CorrelationIdCount),
|
||||
State1 =
|
||||
case Blocked of
|
||||
State1 = case Blocked of
|
||||
true ->
|
||||
case should_unblock(Connection, Configuration) of
|
||||
true ->
|
||||
|
|
@ -603,13 +805,24 @@ listen_loop_post_auth(Transport,
|
|||
false ->
|
||||
State
|
||||
end,
|
||||
listen_loop_post_auth(Transport, Connection, State1, Configuration);
|
||||
{'$gen_cast',
|
||||
{keep_state, StatemData#statem_data{connection_state = State1}};
|
||||
open(cast,
|
||||
{queue_event, _QueueResource,
|
||||
{osiris_written,
|
||||
#resource{name = Stream},
|
||||
PublisherReference,
|
||||
CorrelationList}}} ->
|
||||
{osiris_written, #resource{name = Stream}, PublisherReference, CorrelationList}},
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{socket = S,
|
||||
credits = Credits,
|
||||
heartbeater = Heartbeater,
|
||||
publishers = Publishers,
|
||||
publisher_to_ids = PublisherRefToIds
|
||||
} = Connection0,
|
||||
connection_state = #stream_connection_state{
|
||||
blocked = Blocked
|
||||
} = State,
|
||||
config = Configuration
|
||||
} = StatemData) ->
|
||||
Connection = ensure_stats_timer(Connection0),
|
||||
%% FIXME handle case when publisher ID is not found (e.g. deleted before confirms arrive)
|
||||
PublisherId =
|
||||
maps:get({Stream, PublisherReference}, PublisherRefToIds,
|
||||
|
|
@ -636,17 +849,25 @@ listen_loop_post_auth(Transport,
|
|||
false ->
|
||||
State
|
||||
end,
|
||||
listen_loop_post_auth(Transport, Connection, State1, Configuration);
|
||||
{'$gen_cast',
|
||||
{queue_event, #resource{name = StreamName},
|
||||
{osiris_offset, _QueueResource, -1}}} ->
|
||||
{keep_state, StatemData#statem_data{connection_state = State1}};
|
||||
open(cast, {queue_event, #resource{name = StreamName},
|
||||
{osiris_offset, _QueueResource, -1}}, _StatemData) ->
|
||||
rabbit_log:info("received osiris offset event for ~p with offset ~p",
|
||||
[StreamName, -1]),
|
||||
listen_loop_post_auth(Transport, Connection, State, Configuration);
|
||||
{'$gen_cast',
|
||||
keep_state_and_data;
|
||||
open(cast,
|
||||
{queue_event, #resource{name = StreamName},
|
||||
{osiris_offset, _QueueResource, Offset}}}
|
||||
{osiris_offset, _QueueResource, Offset}},
|
||||
#statem_data{
|
||||
transport = Transport,
|
||||
connection = #stream_connection{
|
||||
stream_subscriptions = StreamSubscriptions,
|
||||
send_file_oct = SendFileOct
|
||||
} = Connection0,
|
||||
connection_state = #stream_connection_state{consumers = Consumers} = State
|
||||
} = StatemData)
|
||||
when Offset > -1 ->
|
||||
Connection = ensure_stats_timer(Connection0),
|
||||
{Connection1, State1} =
|
||||
case maps:get(StreamName, StreamSubscriptions, undefined) of
|
||||
undefined ->
|
||||
|
|
@ -701,47 +922,13 @@ listen_loop_post_auth(Transport,
|
|||
{Connection,
|
||||
State#stream_connection_state{consumers = Consumers1}}
|
||||
end,
|
||||
listen_loop_post_auth(Transport,
|
||||
Connection1,
|
||||
State1,
|
||||
Configuration);
|
||||
heartbeat_send ->
|
||||
Frame = rabbit_stream_core:frame(heartbeat),
|
||||
case catch send(Transport, S, Frame) of
|
||||
ok ->
|
||||
listen_loop_post_auth(Transport,
|
||||
Connection,
|
||||
State,
|
||||
Configuration);
|
||||
Unexpected ->
|
||||
rabbit_log_connection:info("Heartbeat send error ~p, closing connection",
|
||||
[Unexpected]),
|
||||
C1 = demonitor_all_streams(Connection),
|
||||
close(Transport, C1, State)
|
||||
end;
|
||||
heartbeat_timeout ->
|
||||
rabbit_log_connection:debug("Heartbeat timeout, closing connection"),
|
||||
C1 = demonitor_all_streams(Connection),
|
||||
close(Transport, C1, State);
|
||||
{infos, From} ->
|
||||
From ! {self(), ClientProperties},
|
||||
listen_loop_post_auth(Transport, Connection, State, Configuration);
|
||||
{'$gen_call', From, info} ->
|
||||
gen_server:reply(From, infos(?INFO_ITEMS, Connection, State)),
|
||||
listen_loop_post_auth(Transport, Connection, State, Configuration);
|
||||
{'$gen_call', From, {info, Items}} ->
|
||||
gen_server:reply(From, infos(Items, Connection, State)),
|
||||
listen_loop_post_auth(Transport, Connection, State, Configuration);
|
||||
{'$gen_call', From, {consumers_info, Items}} ->
|
||||
gen_server:reply(From, consumers_infos(Items, State)),
|
||||
listen_loop_post_auth(Transport, Connection, State, Configuration);
|
||||
{'$gen_call', From, {publishers_info, Items}} ->
|
||||
gen_server:reply(From, publishers_infos(Items, Connection)),
|
||||
listen_loop_post_auth(Transport, Connection, State, Configuration);
|
||||
emit_stats ->
|
||||
Connection1 = emit_stats(Connection, State),
|
||||
listen_loop_post_auth(Transport, Connection1, State, Configuration);
|
||||
{'$gen_cast', {force_event_refresh, Ref}} ->
|
||||
{keep_state, StatemData#statem_data{
|
||||
connection = Connection1,
|
||||
connection_state = State1}};
|
||||
open(cast, {force_event_refresh, Ref}, #statem_data{
|
||||
connection = Connection0,
|
||||
connection_state = State} = StatemData) ->
|
||||
Connection = ensure_stats_timer(Connection0),
|
||||
Infos =
|
||||
augment_infos_with_user_provided_connection_name(infos(?CREATION_EVENT_KEYS,
|
||||
Connection,
|
||||
|
|
@ -751,83 +938,59 @@ listen_loop_post_auth(Transport,
|
|||
Connection1 =
|
||||
rabbit_event:init_stats_timer(Connection,
|
||||
#stream_connection.stats_timer),
|
||||
listen_loop_post_auth(Transport, Connection1, State, Configuration);
|
||||
{'$gen_call', From, {shutdown, Explanation}} ->
|
||||
% likely closing call from the management plugin
|
||||
gen_server:reply(From, ok),
|
||||
rabbit_log_connection:info("Forcing stream connection ~p closing: ~p",
|
||||
[self(), Explanation]),
|
||||
demonitor_all_streams(Connection),
|
||||
rabbit_networking:unregister_non_amqp_connection(self()),
|
||||
notify_connection_closed(Connection, State),
|
||||
close(Transport, S, State),
|
||||
ok;
|
||||
{Closed, S} ->
|
||||
demonitor_all_streams(Connection),
|
||||
rabbit_networking:unregister_non_amqp_connection(self()),
|
||||
notify_connection_closed(Connection, State),
|
||||
rabbit_log_connection:warning("Socket ~w closed [~w]", [S, self()]),
|
||||
ok;
|
||||
{Error, S, Reason} ->
|
||||
demonitor_all_streams(Connection),
|
||||
rabbit_networking:unregister_non_amqp_connection(self()),
|
||||
notify_connection_closed(Connection, State),
|
||||
rabbit_log_connection:error("Socket error ~p [~w] [~w]", [Reason, S, self()]);
|
||||
M ->
|
||||
rabbit_log_connection:warning("Unknown message ~p", [M]),
|
||||
%% FIXME send close
|
||||
listen_loop_post_auth(Transport, Connection, State, Configuration)
|
||||
end.
|
||||
{keep_state, StatemData#statem_data{connection = Connection1}}.
|
||||
|
||||
listen_loop_post_close(Transport,
|
||||
#stream_connection{socket = S} = Connection,
|
||||
State,
|
||||
Configuration) ->
|
||||
{OK, Closed, Error, _Passive} = Transport:messages(),
|
||||
%% FIXME demonitor streams
|
||||
%% FIXME introduce timeout to complete the connection closing (after block should be enough)
|
||||
receive
|
||||
{resource_alarm, IsThereAlarm} ->
|
||||
handle_inbound_data_post_close(Transport,
|
||||
Connection#stream_connection{resource_alarm
|
||||
=
|
||||
IsThereAlarm},
|
||||
State,
|
||||
<<>>);
|
||||
{OK, S, Data} ->
|
||||
Transport:setopts(S, [{active, once}]),
|
||||
{Connection1, State1} =
|
||||
handle_inbound_data_post_close(Transport,
|
||||
close_sent(info, {tcp, S, Data}, #statem_data{
|
||||
transport = Transport,
|
||||
connection = Connection,
|
||||
connection_state = State
|
||||
} = StatemData) when byte_size(Data) > 1 ->
|
||||
{Connection1, State1} = handle_inbound_data_post_close(Transport,
|
||||
Connection,
|
||||
State,
|
||||
Data),
|
||||
#stream_connection{connection_step = Step} = Connection1,
|
||||
rabbit_log_connection:debug("Transitioned from ~s to ~s", [?FUNCTION_NAME, Step]),
|
||||
case Step of
|
||||
closing_done ->
|
||||
rabbit_log_connection:debug("Received close confirmation from client"),
|
||||
close(Transport, S, State),
|
||||
close(Transport, S, State1),
|
||||
rabbit_networking:unregister_non_amqp_connection(self()),
|
||||
notify_connection_closed(Connection1, State1);
|
||||
notify_connection_closed(Connection1, State1),
|
||||
stop;
|
||||
_ ->
|
||||
Transport:setopts(S, [{active, once}]),
|
||||
listen_loop_post_close(Transport,
|
||||
Connection1,
|
||||
State1,
|
||||
Configuration)
|
||||
{keep_state, StatemData#statem_data{
|
||||
connection = Connection1,
|
||||
connection_state = State1
|
||||
}}
|
||||
end;
|
||||
{Closed, S} ->
|
||||
close_sent(info, {tcp_closed, S}, #statem_data{
|
||||
connection = Connection,
|
||||
connection_state = State
|
||||
}) ->
|
||||
rabbit_networking:unregister_non_amqp_connection(self()),
|
||||
notify_connection_closed(Connection, State),
|
||||
rabbit_log_connection:info("Socket ~w closed [~w]", [S, self()]),
|
||||
ok;
|
||||
{Error, S, Reason} ->
|
||||
stop;
|
||||
close_sent(info, {tcp_error, S, Reason}, #statem_data{
|
||||
transport = Transport,
|
||||
connection = Connection,
|
||||
connection_state = State
|
||||
}) ->
|
||||
rabbit_log_connection:info("Socket error ~p [~w] [~w]", [Reason, S, self()]),
|
||||
close(Transport, S, State),
|
||||
rabbit_networking:unregister_non_amqp_connection(self()),
|
||||
notify_connection_closed(Connection, State);
|
||||
M ->
|
||||
rabbit_log_connection:warning("Ignored message on closing ~p", [M])
|
||||
end.
|
||||
notify_connection_closed(Connection, State),
|
||||
stop;
|
||||
close_sent(info,{resource_alarm, IsThereAlarm},
|
||||
StatemData = #statem_data{connection = Connection}) ->
|
||||
rabbit_log:info("Ignored resource_alarm ~p in state ~s", [IsThereAlarm, ?FUNCTION_NAME]),
|
||||
{keep_state, StatemData#statem_data{
|
||||
connection = Connection#stream_connection{resource_alarm = IsThereAlarm}
|
||||
}};
|
||||
close_sent(info, Msg, _StatemData) ->
|
||||
rabbit_log_connection:warning("Ignored unknown message ~p in state ~s", [Msg, ?FUNCTION_NAME]),
|
||||
keep_state_and_data.
|
||||
|
||||
handle_inbound_data_pre_auth(Transport, Connection, State, Data) ->
|
||||
handle_inbound_data(Transport,
|
||||
|
|
@ -2001,7 +2164,7 @@ handle_frame_post_auth(Transport,
|
|||
#stream_connection{socket = S} = Connection,
|
||||
State,
|
||||
Command) ->
|
||||
rabbit_log:warning("unknown command ~p , sending close command.",
|
||||
rabbit_log:warning("unknown command ~p, sending close command.",
|
||||
[Command]),
|
||||
CloseReason = <<"unknown frame">>,
|
||||
Frame =
|
||||
|
|
@ -2039,13 +2202,13 @@ handle_frame_post_close(_Transport,
|
|||
Connection,
|
||||
State,
|
||||
{response, _CorrelationId, {close, _Code}}) ->
|
||||
rabbit_log:info("Received close confirmation"),
|
||||
rabbit_log_connection:info("Received close confirmation from client"),
|
||||
{Connection#stream_connection{connection_step = closing_done}, State};
|
||||
handle_frame_post_close(_Transport, Connection, State, heartbeat) ->
|
||||
rabbit_log:debug("Received heartbeat command post close"),
|
||||
rabbit_log_connection:debug("Received heartbeat command post close"),
|
||||
{Connection, State};
|
||||
handle_frame_post_close(_Transport, Connection, State, Command) ->
|
||||
rabbit_log:warning("ignored command on close ~p .", [Command]),
|
||||
rabbit_log_connection:warning("ignored command on close ~p .", [Command]),
|
||||
{Connection, State}.
|
||||
|
||||
stream_r(Stream, #stream_connection{virtual_host = VHost}) ->
|
||||
|
|
|
|||
Loading…
Reference in New Issue