See #7593. Use connection_max to stop connections in rabbitmq
This commit is contained in:
parent
c88ec8a86c
commit
b42e99acfe
|
@ -39,7 +39,7 @@ _APP_ENV = """[
|
||||||
{frame_max, 131072},
|
{frame_max, 131072},
|
||||||
%% see rabbitmq-server#1593
|
%% see rabbitmq-server#1593
|
||||||
{channel_max, 2047},
|
{channel_max, 2047},
|
||||||
{connection_max, infinity},
|
{ranch_connection_max, infinity},
|
||||||
{heartbeat, 60},
|
{heartbeat, 60},
|
||||||
{msg_store_file_size_limit, 16777216},
|
{msg_store_file_size_limit, 16777216},
|
||||||
{msg_store_shutdown_timeout, 600000},
|
{msg_store_shutdown_timeout, 600000},
|
||||||
|
@ -507,6 +507,11 @@ rabbitmq_integration_suite(
|
||||||
size = "medium",
|
size = "medium",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
rabbitmq_integration_suite(
|
||||||
|
name = "per_node_limit_SUITE",
|
||||||
|
size = "medium",
|
||||||
|
)
|
||||||
|
|
||||||
rabbitmq_integration_suite(
|
rabbitmq_integration_suite(
|
||||||
name = "metrics_SUITE",
|
name = "metrics_SUITE",
|
||||||
size = "medium",
|
size = "medium",
|
||||||
|
|
|
@ -26,7 +26,7 @@ define PROJECT_ENV
|
||||||
{frame_max, 131072},
|
{frame_max, 131072},
|
||||||
%% see rabbitmq-server#1593
|
%% see rabbitmq-server#1593
|
||||||
{channel_max, 2047},
|
{channel_max, 2047},
|
||||||
{connection_max, infinity},
|
{ranch_connection_max, infinity},
|
||||||
{heartbeat, 60},
|
{heartbeat, 60},
|
||||||
{msg_store_file_size_limit, 16777216},
|
{msg_store_file_size_limit, 16777216},
|
||||||
{msg_store_shutdown_timeout, 600000},
|
{msg_store_shutdown_timeout, 600000},
|
||||||
|
|
|
@ -883,6 +883,20 @@ end}.
|
||||||
end
|
end
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
{mapping, "ranch_connection_max", "rabbit.ranch_connection_max",
|
||||||
|
[{datatype, [{atom, infinity}, integer]}]}.
|
||||||
|
|
||||||
|
{translation, "rabbit.ranch_connection_max",
|
||||||
|
fun(Conf) ->
|
||||||
|
case cuttlefish:conf_get("ranch_connection_max", Conf, undefined) of
|
||||||
|
undefined -> cuttlefish:unset();
|
||||||
|
infinity -> infinity;
|
||||||
|
Val when is_integer(Val) -> Val;
|
||||||
|
_ -> cuttlefish:invalid("should be a non-negative integer")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
}.
|
||||||
|
|
||||||
|
|
||||||
{mapping, "max_message_size", "rabbit.max_message_size",
|
{mapping, "max_message_size", "rabbit.max_message_size",
|
||||||
[{datatype, integer}, {validators, ["max_message_size"]}]}.
|
[{datatype, integer}, {validators, ["max_message_size"]}]}.
|
||||||
|
|
|
@ -1212,12 +1212,13 @@ handle_method0(#'connection.open'{virtual_host = VHost},
|
||||||
State = #v1{connection_state = opening,
|
State = #v1{connection_state = opening,
|
||||||
connection = Connection = #connection{
|
connection = Connection = #connection{
|
||||||
log_name = ConnName,
|
log_name = ConnName,
|
||||||
|
port = Port,
|
||||||
user = User = #user{username = Username},
|
user = User = #user{username = Username},
|
||||||
protocol = Protocol},
|
protocol = Protocol},
|
||||||
helper_sup = SupPid,
|
helper_sup = SupPid,
|
||||||
sock = Sock,
|
sock = Sock,
|
||||||
throttle = Throttle}) ->
|
throttle = Throttle}) ->
|
||||||
|
ok = is_over_node_connection_limit(Port),
|
||||||
ok = is_over_vhost_connection_limit(VHost, User),
|
ok = is_over_vhost_connection_limit(VHost, User),
|
||||||
ok = is_over_user_connection_limit(User),
|
ok = is_over_user_connection_limit(User),
|
||||||
ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}, #{}),
|
ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}, #{}),
|
||||||
|
@ -1318,6 +1319,20 @@ is_vhost_alive(VHostPath, User) ->
|
||||||
[VHostPath, User#user.username, VHostPath])
|
[VHostPath, User#user.username, VHostPath])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
is_over_node_connection_limit(Port) ->
|
||||||
|
{Addr, _, _} = hd(rabbit_networking:tcp_listener_addresses(Port)),
|
||||||
|
Ref = rabbit_networking:ranch_ref(Addr, Port),
|
||||||
|
#{active_connections := ActiveConns} = ranch:info(Ref),
|
||||||
|
Limit = rabbit_misc:get_env(rabbit, connection_max, infinity),
|
||||||
|
case ActiveConns > Limit of
|
||||||
|
false -> ok;
|
||||||
|
true ->
|
||||||
|
rabbit_misc:protocol_error(not_allowed,
|
||||||
|
"connection refused: "
|
||||||
|
"node connection limit (~tp) is reached",
|
||||||
|
[Limit])
|
||||||
|
end.
|
||||||
|
|
||||||
is_over_vhost_connection_limit(VHostPath, User) ->
|
is_over_vhost_connection_limit(VHostPath, User) ->
|
||||||
try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of
|
try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of
|
||||||
false -> ok;
|
false -> ok;
|
||||||
|
|
|
@ -35,7 +35,7 @@ start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartu
|
||||||
init({IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
|
init({IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
|
||||||
ConcurrentAcceptorCount, ConcurrentConnsSups, ConnectionType, Label}) ->
|
ConcurrentAcceptorCount, ConcurrentConnsSups, ConnectionType, Label}) ->
|
||||||
{ok, AckTimeout} = application:get_env(rabbit, ssl_handshake_timeout),
|
{ok, AckTimeout} = application:get_env(rabbit, ssl_handshake_timeout),
|
||||||
MaxConnections = max_conn(rabbit_misc:get_env(rabbit, connection_max, infinity),
|
MaxConnections = max_conn(rabbit_misc:get_env(rabbit, ranch_connection_max, infinity),
|
||||||
ConcurrentConnsSups),
|
ConcurrentConnsSups),
|
||||||
RanchListenerOpts = #{
|
RanchListenerOpts = #{
|
||||||
num_acceptors => ConcurrentAcceptorCount,
|
num_acceptors => ConcurrentAcceptorCount,
|
||||||
|
|
|
@ -287,13 +287,13 @@ tcp_listen_options.exit_on_close = false",
|
||||||
"total_memory_available_override_value = 1024MB",
|
"total_memory_available_override_value = 1024MB",
|
||||||
[{rabbit,[{total_memory_available_override_value, "1024MB"}]}],
|
[{rabbit,[{total_memory_available_override_value, "1024MB"}]}],
|
||||||
[]},
|
[]},
|
||||||
{connection_max,
|
{ranch_connection_max,
|
||||||
"connection_max = 999",
|
"ranch_connection_max = 999",
|
||||||
[{rabbit,[{connection_max, 999}]}],
|
[{rabbit,[{ranch_connection_max, 999}]}],
|
||||||
[]},
|
[]},
|
||||||
{connection_max,
|
{ranch_connection_max,
|
||||||
"connection_max = infinity",
|
"ranch_connection_max = infinity",
|
||||||
[{rabbit,[{connection_max, infinity}]}],
|
[{rabbit,[{ranch_connection_max, infinity}]}],
|
||||||
[]},
|
[]},
|
||||||
{channel_max,
|
{channel_max,
|
||||||
"channel_max = 16",
|
"channel_max = 16",
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2011-2023 VMware, Inc. or its affiliates. All rights reserved.
|
||||||
|
%%
|
||||||
|
|
||||||
|
-module(per_node_limit_SUITE).
|
||||||
|
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[
|
||||||
|
{group, parallel_tests}
|
||||||
|
].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
[
|
||||||
|
{parallel_tests, [parallel], [
|
||||||
|
node_connection_limit
|
||||||
|
]}
|
||||||
|
].
|
||||||
|
|
||||||
|
suite() ->
|
||||||
|
[
|
||||||
|
{timetrap, {minutes, 3}}
|
||||||
|
].
|
||||||
|
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%% Testsuite setup/teardown.
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
rabbit_ct_helpers:log_environment(),
|
||||||
|
rabbit_ct_helpers:run_setup_steps(Config).
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
rabbit_ct_helpers:run_teardown_steps(Config).
|
||||||
|
|
||||||
|
init_per_group(Group, Config) ->
|
||||||
|
Config1 = rabbit_ct_helpers:set_config(Config, [
|
||||||
|
{rmq_nodename_suffix, Group},
|
||||||
|
{rmq_nodes_count, 1}
|
||||||
|
]),
|
||||||
|
rabbit_ct_helpers:run_steps(Config1,
|
||||||
|
rabbit_ct_broker_helpers:setup_steps() ++
|
||||||
|
rabbit_ct_client_helpers:setup_steps()).
|
||||||
|
|
||||||
|
end_per_group(_Group, Config) ->
|
||||||
|
rabbit_ct_helpers:run_steps(Config,
|
||||||
|
rabbit_ct_client_helpers:teardown_steps() ++
|
||||||
|
rabbit_ct_broker_helpers:teardown_steps()).
|
||||||
|
|
||||||
|
init_per_testcase(Testcase, Config) ->
|
||||||
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
||||||
|
|
||||||
|
end_per_testcase(Testcase, Config) ->
|
||||||
|
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||||
|
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%% Test cases
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
node_connection_limit(Config) ->
|
||||||
|
%% Set limit to 0, don't accept any connections
|
||||||
|
set_node_limit(Config, 0),
|
||||||
|
{error, not_allowed} = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
||||||
|
|
||||||
|
%% Set limit to 5, accept 5 connections
|
||||||
|
Connections = open_connections_to_limit(Config, 5),
|
||||||
|
%% But no more than 5
|
||||||
|
{error, not_allowed} = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
||||||
|
close_all_connections(Connections),
|
||||||
|
|
||||||
|
set_node_limit(Config, infinity),
|
||||||
|
C = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
|
||||||
|
true = is_pid(C).
|
||||||
|
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%% Implementation
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
open_connections_to_limit(Config, Limit) ->
|
||||||
|
set_node_limit(Config, Limit),
|
||||||
|
Connections = [rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0) || _ <- lists:seq(1,Limit)],
|
||||||
|
true = lists:all(fun(E) -> is_pid(E) end, Connections),
|
||||||
|
Connections.
|
||||||
|
|
||||||
|
close_all_connections(Connections) ->
|
||||||
|
[rabbit_ct_client_helpers:close_connection(C) || C <- Connections].
|
||||||
|
|
||||||
|
set_node_limit(Config, Limit) ->
|
||||||
|
rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||||
|
application,
|
||||||
|
set_env, [rabbit, connection_max, Limit]).
|
Loading…
Reference in New Issue