Merge branch 'stable'
This commit is contained in:
commit
35c7a8d9ce
|
@ -70,7 +70,7 @@
|
|||
-export([interval_operation/5]).
|
||||
-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]).
|
||||
-export([get_parent/0]).
|
||||
-export([store_proc_name/1, store_proc_name/2]).
|
||||
-export([store_proc_name/1, store_proc_name/2, get_proc_name/0]).
|
||||
-export([moving_average/4]).
|
||||
-export([get_env/3]).
|
||||
-export([get_channel_operation_timeout/0]).
|
||||
|
@ -256,6 +256,7 @@
|
|||
-spec get_parent() -> pid().
|
||||
-spec store_proc_name(atom(), rabbit_types:proc_name()) -> ok.
|
||||
-spec store_proc_name(rabbit_types:proc_type_and_name()) -> ok.
|
||||
-spec get_proc_name() -> rabbit_types:proc_name().
|
||||
-spec moving_average(float(), float(), float(), float() | 'undefined') ->
|
||||
float().
|
||||
-spec get_env(atom(), atom(), term()) -> term().
|
||||
|
@ -1122,6 +1123,14 @@ cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref),
|
|||
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
|
||||
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
|
||||
|
||||
get_proc_name() ->
|
||||
case get(process_name) of
|
||||
undefined ->
|
||||
undefined;
|
||||
{_Type, Name} ->
|
||||
{ok, Name}
|
||||
end.
|
||||
|
||||
%% application:get_env/3 is only available in R16B01 or later.
|
||||
get_env(Application, Key, Def) ->
|
||||
case application:get_env(Application, Key) of
|
||||
|
|
|
@ -106,6 +106,9 @@
|
|||
-record(connection, {
|
||||
%% e.g. <<"127.0.0.1:55054 -> 127.0.0.1:5672">>
|
||||
name,
|
||||
%% used for logging: same as `name`, but optionally
|
||||
%% augmented with user-supplied name
|
||||
log_name,
|
||||
%% server host
|
||||
host,
|
||||
%% client host
|
||||
|
@ -338,7 +341,7 @@ socket_op(Sock, Fun) ->
|
|||
start_connection(Parent, HelperSup, Deb, Sock) ->
|
||||
process_flag(trap_exit, true),
|
||||
Name = case rabbit_net:connection_string(Sock, inbound) of
|
||||
{ok, Str} -> Str;
|
||||
{ok, Str} -> list_to_binary(Str);
|
||||
{error, enotconn} -> rabbit_net:fast_close(Sock),
|
||||
exit(normal);
|
||||
{error, Reason} -> socket_error(Reason),
|
||||
|
@ -350,11 +353,12 @@ start_connection(Parent, HelperSup, Deb, Sock) ->
|
|||
erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
|
||||
{PeerHost, PeerPort, Host, Port} =
|
||||
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
|
||||
?store_proc_name(list_to_binary(Name)),
|
||||
?store_proc_name(Name),
|
||||
State = #v1{parent = Parent,
|
||||
sock = Sock,
|
||||
connection = #connection{
|
||||
name = list_to_binary(Name),
|
||||
name = Name,
|
||||
log_name = Name,
|
||||
host = Host,
|
||||
peer_host = PeerHost,
|
||||
port = Port,
|
||||
|
@ -391,10 +395,10 @@ start_connection(Parent, HelperSup, Deb, Sock) ->
|
|||
State, #v1.stats_timer),
|
||||
handshake, 8)]}),
|
||||
rabbit_log_connection:info("closing AMQP connection ~p (~s)~n",
|
||||
[self(), Name])
|
||||
[self(), dynamic_connection_name(Name)])
|
||||
catch
|
||||
Ex ->
|
||||
log_connection_exception(Name, Ex)
|
||||
log_connection_exception(dynamic_connection_name(Name), Ex)
|
||||
after
|
||||
%% We don't call gen_tcp:close/1 here since it waits for
|
||||
%% pending output to be sent, which results in unnecessary
|
||||
|
@ -704,9 +708,9 @@ wait_for_channel_termination(0, TimerRef, State) ->
|
|||
wait_for_channel_termination(N, TimerRef,
|
||||
State = #v1{connection_state = CS,
|
||||
connection = #connection{
|
||||
name = ConnName,
|
||||
user = User,
|
||||
vhost = VHost},
|
||||
log_name = ConnName,
|
||||
user = User,
|
||||
vhost = VHost},
|
||||
sock = Sock}) ->
|
||||
receive
|
||||
{'DOWN', _MRef, process, ChPid, Reason} ->
|
||||
|
@ -755,7 +759,7 @@ format_hard_error(Reason) ->
|
|||
|
||||
log_hard_error(#v1{connection_state = CS,
|
||||
connection = #connection{
|
||||
name = ConnName,
|
||||
log_name = ConnName,
|
||||
user = User,
|
||||
vhost = VHost}}, Channel, Reason) ->
|
||||
rabbit_log_connection:error(
|
||||
|
@ -773,7 +777,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol},
|
|||
respond_and_close(State, Channel, Protocol, Reason, Reason);
|
||||
%% authentication failure
|
||||
handle_exception(State = #v1{connection = #connection{protocol = Protocol,
|
||||
name = ConnName,
|
||||
log_name = ConnName,
|
||||
capabilities = Capabilities},
|
||||
connection_state = starting},
|
||||
Channel, Reason = #amqp_error{name = access_refused,
|
||||
|
@ -792,7 +796,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol,
|
|||
%% when loopback-only user tries to connect from a non-local host
|
||||
%% when user tries to access a vhost it has no permissions for
|
||||
handle_exception(State = #v1{connection = #connection{protocol = Protocol,
|
||||
name = ConnName,
|
||||
log_name = ConnName,
|
||||
user = User},
|
||||
connection_state = opening},
|
||||
Channel, Reason = #amqp_error{name = not_allowed,
|
||||
|
@ -809,7 +813,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol},
|
|||
%% when negotiation fails, e.g. due to channel_max being higher than the
|
||||
%% maxiumum allowed limit
|
||||
handle_exception(State = #v1{connection = #connection{protocol = Protocol,
|
||||
name = ConnName,
|
||||
log_name = ConnName,
|
||||
user = User},
|
||||
connection_state = tuning},
|
||||
Channel, Reason = #amqp_error{name = not_allowed,
|
||||
|
@ -1094,7 +1098,7 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
|
|||
response = Response,
|
||||
client_properties = ClientProperties},
|
||||
State0 = #v1{connection_state = starting,
|
||||
connection = Connection,
|
||||
connection = Connection0,
|
||||
sock = Sock}) ->
|
||||
AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
|
||||
Capabilities =
|
||||
|
@ -1102,13 +1106,14 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
|
|||
{table, Capabilities1} -> Capabilities1;
|
||||
_ -> []
|
||||
end,
|
||||
Connection1 = Connection0#connection{
|
||||
client_properties = ClientProperties,
|
||||
capabilities = Capabilities,
|
||||
auth_mechanism = {Mechanism, AuthMechanism},
|
||||
auth_state = AuthMechanism:init(Sock)},
|
||||
Connection2 = augment_connection_log_name(Connection1),
|
||||
State = State0#v1{connection_state = securing,
|
||||
connection =
|
||||
Connection#connection{
|
||||
client_properties = ClientProperties,
|
||||
capabilities = Capabilities,
|
||||
auth_mechanism = {Mechanism, AuthMechanism},
|
||||
auth_state = AuthMechanism:init(Sock)}},
|
||||
connection = Connection2},
|
||||
auth_phase(Response, State);
|
||||
|
||||
handle_method0(#'connection.secure_ok'{response = Response},
|
||||
|
@ -1597,3 +1602,23 @@ control_throttle(State = #v1{connection_state = CS,
|
|||
blocked -> maybe_block(maybe_unblock(State1));
|
||||
_ -> State1
|
||||
end.
|
||||
|
||||
augment_connection_log_name(#connection{client_properties = ClientProperties,
|
||||
name = Name} = Connection) ->
|
||||
case rabbit_misc:table_lookup(ClientProperties, <<"connection_name">>) of
|
||||
{longstr, UserSpecifiedName} ->
|
||||
LogName = <<Name/binary, " - ", UserSpecifiedName/binary>>,
|
||||
rabbit_log_connection:info("Connection ~p (~s) has a client-provided name: ~s~n", [self(), Name, UserSpecifiedName]),
|
||||
?store_proc_name(LogName),
|
||||
Connection#connection{log_name = LogName};
|
||||
_ ->
|
||||
Connection
|
||||
end.
|
||||
|
||||
dynamic_connection_name(Default) ->
|
||||
case rabbit_misc:get_proc_name() of
|
||||
{ok, Name} ->
|
||||
Name;
|
||||
_ ->
|
||||
Default
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue