Restore credit_flow between channel/MQTT connection -> CQ processes
The credit_flow between publishing AMQP 0.9.1 channel (or MQTT
connection) and (non-mirrored) classic queue processes was
unintentionally removed in 4.0 together with anything else related to
CQ mirroring.
By default we restore the 3.x behaviour for non-mirored classic
queues. It is possible to disable flow-control (the earlier 4.0.x
behaviour) with the new env `classic_queue_flow_control`. In 3.x this
was possible with the config `mirroring_flow_control`.
(cherry picked from commit d65bd7d07a
)
This commit is contained in:
parent
e777c0b263
commit
2c1f1a1387
|
@ -85,6 +85,8 @@ _APP_ENV = """[
|
|||
{exit_on_close, false}
|
||||
]},
|
||||
{ssl_apps, [asn1, crypto, public_key, ssl]},
|
||||
%% see rabbitmq-server#114
|
||||
{classic_queue_flow_control, true},
|
||||
%% see rabbitmq-server#227 and related tickets.
|
||||
%% msg_store_credit_disc_bound only takes effect when
|
||||
%% messages are persisted to the message store. If messages
|
||||
|
|
|
@ -68,6 +68,8 @@ define PROJECT_ENV
|
|||
{exit_on_close, false}
|
||||
]},
|
||||
{ssl_apps, [asn1, crypto, public_key, ssl]},
|
||||
%% see rabbitmq-server#114
|
||||
{classic_queue_flow_control, true},
|
||||
%% see rabbitmq-server#227 and related tickets.
|
||||
%% msg_store_credit_disc_bound only takes effect when
|
||||
%% messages are persisted to the message store. If messages
|
||||
|
|
|
@ -159,7 +159,8 @@
|
|||
rejected,
|
||||
%% used by "one shot RPC" (amq.
|
||||
reply_consumer :: none | {rabbit_types:ctag(), binary(), binary()},
|
||||
delivery_flow, %% Deprecated since removal of CMQ in 4.0
|
||||
%% see rabbitmq-server#114
|
||||
delivery_flow :: flow | noflow,
|
||||
interceptor_state,
|
||||
queue_states,
|
||||
tick_timer,
|
||||
|
@ -489,6 +490,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
|
|||
?LG_PROCESS_TYPE(channel),
|
||||
?store_proc_name({ConnName, Channel}),
|
||||
ok = pg_local:join(rabbit_channels, self()),
|
||||
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
|
||||
true -> flow;
|
||||
false -> noflow
|
||||
end,
|
||||
{ok, {Global0, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
|
||||
Limiter0 = rabbit_limiter:new(LimiterPid),
|
||||
Global = Global0 andalso is_global_qos_permitted(),
|
||||
|
@ -537,6 +542,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
|
|||
rejected = [],
|
||||
confirmed = [],
|
||||
reply_consumer = none,
|
||||
delivery_flow = Flow,
|
||||
interceptor_state = undefined,
|
||||
queue_states = rabbit_queue_type:init()
|
||||
},
|
||||
|
|
|
@ -449,7 +449,7 @@ deliver(Qs0, Msg0, Options) ->
|
|||
Confirm = MsgSeqNo /= undefined,
|
||||
|
||||
{MPids, Qs} = qpids(Qs0, Confirm, MsgSeqNo),
|
||||
Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo),
|
||||
Delivery = rabbit_basic:delivery(Mandatory, Confirm, Msg, MsgSeqNo, Flow),
|
||||
|
||||
case Flow of
|
||||
%% Here we are tracking messages sent by the rabbit_channel
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
|
||||
|
||||
-compile([nowarn_export_all, export_all]).
|
||||
|
||||
|
@ -18,11 +19,17 @@
|
|||
|
||||
all() ->
|
||||
[
|
||||
{group, cluster_size_1},
|
||||
{group, cluster_size_3}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
[
|
||||
{cluster_size_1, [], [
|
||||
classic_queue_flow_control_enabled,
|
||||
classic_queue_flow_control_disabled
|
||||
]
|
||||
},
|
||||
{cluster_size_3, [], [
|
||||
leader_locator_client_local,
|
||||
leader_locator_balanced,
|
||||
|
@ -42,10 +49,14 @@ end_per_suite(Config) ->
|
|||
rabbit_ct_helpers:run_teardown_steps(Config).
|
||||
|
||||
init_per_group(Group, Config) ->
|
||||
Nodes = case Group of
|
||||
cluster_size_1 -> 1;
|
||||
cluster_size_3 -> 3
|
||||
end,
|
||||
Config1 = rabbit_ct_helpers:set_config(Config,
|
||||
[
|
||||
{rmq_nodename_suffix, Group},
|
||||
{rmq_nodes_count, 3},
|
||||
{rmq_nodes_count, Nodes},
|
||||
{rmq_nodes_clustered, true},
|
||||
{tcp_ports_base, {skip_n_nodes, 3}}
|
||||
]),
|
||||
|
@ -72,6 +83,67 @@ init_per_testcase(T, Config) ->
|
|||
%% Testcases.
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
classic_queue_flow_control_enabled(Config) ->
|
||||
FlowEnabled = true,
|
||||
VerifyFun =
|
||||
fun(QPid, ConnPid) ->
|
||||
%% Only 2+2 messages reach the message queue of the classic queue.
|
||||
%% (before the credits of the connection and channel processes run out)
|
||||
?awaitMatch(4, proc_info(QPid, message_queue_len), 1000),
|
||||
?assertMatch({0, _}, gen_server2_queue(QPid)),
|
||||
|
||||
%% The connection gets into flow state
|
||||
?assertEqual([{state, flow}], rabbit_reader:info(ConnPid, [state])),
|
||||
|
||||
Dict = proc_info(ConnPid, dictionary),
|
||||
?assertMatch([_|_], proplists:get_value(credit_blocked, Dict)),
|
||||
ok
|
||||
end,
|
||||
flow_control(Config, FlowEnabled, VerifyFun).
|
||||
|
||||
classic_queue_flow_control_disabled(Config) ->
|
||||
FlowEnabled = false,
|
||||
VerifyFun =
|
||||
fun(QPid, ConnPid) ->
|
||||
%% All published messages will end up in the message
|
||||
%% queue of the suspended classic queue process
|
||||
?awaitMatch(100, proc_info(QPid, message_queue_len), 1000),
|
||||
?assertMatch({0, _}, gen_server2_queue(QPid)),
|
||||
|
||||
%% The connection dos not get into flow state
|
||||
?assertEqual([{state, running}], rabbit_reader:info(ConnPid, [state])),
|
||||
|
||||
Dict = proc_info(ConnPid, dictionary),
|
||||
?assertMatch([], proplists:get_value(credit_blocked, Dict, []))
|
||||
end,
|
||||
flow_control(Config, FlowEnabled, VerifyFun).
|
||||
|
||||
flow_control(Config, FlowEnabled, VerifyFun) ->
|
||||
OrigCredit = set_default_credit(Config, {2, 1}),
|
||||
OrigFlow = set_flow_control(Config, FlowEnabled),
|
||||
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
QueueName = atom_to_binary(?FUNCTION_NAME),
|
||||
declare(Ch, QueueName, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
|
||||
QPid = get_queue_pid(Config, QueueName),
|
||||
try
|
||||
sys:suspend(QPid),
|
||||
|
||||
%% Publish 100 messages without publisher confirms
|
||||
publish_many(Ch, QueueName, 100),
|
||||
|
||||
[ConnPid] = rabbit_ct_broker_helpers:rpc(Config, rabbit_networking, local_connections, []),
|
||||
|
||||
VerifyFun(QPid, ConnPid),
|
||||
ok
|
||||
after
|
||||
sys:resume(QPid),
|
||||
delete_queues(Ch, [QueueName]),
|
||||
set_default_credit(Config, OrigCredit),
|
||||
set_flow_control(Config, OrigFlow),
|
||||
rabbit_ct_client_helpers:close_channel(Ch)
|
||||
end.
|
||||
|
||||
leader_locator_client_local(Config) ->
|
||||
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
|
||||
Q = <<"q1">>,
|
||||
|
@ -129,7 +201,55 @@ declare(Ch, Q, Args) ->
|
|||
auto_delete = false,
|
||||
arguments = Args}).
|
||||
|
||||
delete_queues(Ch, Qs) ->
|
||||
[?assertMatch(#'queue.delete_ok'{},
|
||||
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|
||||
|| Q <- Qs].
|
||||
|
||||
delete_queues() ->
|
||||
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|
||||
|| Q <- rabbit_amqqueue:list()].
|
||||
|
||||
|
||||
publish(Ch, QName, Payload) ->
|
||||
amqp_channel:cast(Ch,
|
||||
#'basic.publish'{exchange = <<>>,
|
||||
routing_key = QName},
|
||||
#amqp_msg{payload = Payload}).
|
||||
|
||||
publish_many(Ch, QName, Count) ->
|
||||
[publish(Ch, QName, integer_to_binary(I))
|
||||
|| I <- lists:seq(1, Count)].
|
||||
|
||||
proc_info(Pid, Info) ->
|
||||
case rabbit_misc:process_info(Pid, Info) of
|
||||
{Info, Value} ->
|
||||
Value;
|
||||
Error ->
|
||||
{error, Error}
|
||||
end.
|
||||
|
||||
gen_server2_queue(Pid) ->
|
||||
Status = sys:get_status(Pid),
|
||||
{status, Pid,_Mod,
|
||||
[_Dict, _SysStatus, _Parent, _Dbg,
|
||||
[{header, _},
|
||||
{data, Data}|_]]} = Status,
|
||||
proplists:get_value("Queued messages", Data).
|
||||
|
||||
set_default_credit(Config, Value) ->
|
||||
Key = credit_flow_default_credit,
|
||||
OrigValue = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),
|
||||
ok = rabbit_ct_broker_helpers:rpc(Config, persistent_term, put, [Key, Value]),
|
||||
OrigValue.
|
||||
|
||||
set_flow_control(Config, Value) when is_boolean(Value) ->
|
||||
Key = classic_queue_flow_control,
|
||||
{ok, OrigValue} = rabbit_ct_broker_helpers:rpc(Config, application, get_env, [rabbit, Key]),
|
||||
rabbit_ct_broker_helpers:rpc(Config, application, set_env, [rabbit, Key, Value]),
|
||||
OrigValue.
|
||||
|
||||
get_queue_pid(Config, QueueName) ->
|
||||
{ok, QRec} = rabbit_ct_broker_helpers:rpc(
|
||||
Config, 0, rabbit_amqqueue, lookup, [QueueName, <<"/">>]),
|
||||
amqqueue:get_pid(QRec).
|
||||
|
|
|
@ -72,6 +72,7 @@
|
|||
published = false :: boolean(),
|
||||
ssl_login_name :: none | binary(),
|
||||
retainer_pid :: pid(),
|
||||
delivery_flow :: flow | noflow,
|
||||
trace_state :: rabbit_trace:state(),
|
||||
prefetch :: non_neg_integer(),
|
||||
vhost :: rabbit_types:vhost(),
|
||||
|
@ -148,6 +149,10 @@ process_connect(
|
|||
"protocol version: ~p, keepalive: ~p, property names: ~p",
|
||||
[ClientId0, Username0, CleanStart, ProtoVer, KeepaliveSecs, maps:keys(ConnectProps)]),
|
||||
SslLoginName = ssl_login_name(Socket),
|
||||
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
|
||||
true -> flow;
|
||||
false -> noflow
|
||||
end,
|
||||
MaxPacketSize = maps:get('Maximum-Packet-Size', ConnectProps, ?MAX_PACKET_SIZE),
|
||||
TopicAliasMax = persistent_term:get(?PERSISTENT_TERM_TOPIC_ALIAS_MAXIMUM),
|
||||
TopicAliasMaxOutbound = min(maps:get('Topic-Alias-Maximum', ConnectProps, 0), TopicAliasMax),
|
||||
|
@ -208,6 +213,7 @@ process_connect(
|
|||
clean_start = CleanStart,
|
||||
session_expiry_interval_secs = SessionExpiry,
|
||||
ssl_login_name = SslLoginName,
|
||||
delivery_flow = Flow,
|
||||
trace_state = TraceState,
|
||||
prefetch = prefetch(ConnectProps),
|
||||
conn_name = ConnName,
|
||||
|
@ -1552,6 +1558,7 @@ publish_to_queues(
|
|||
#mqtt_msg{topic = Topic,
|
||||
packet_id = PacketId} = MqttMsg,
|
||||
#state{cfg = #cfg{exchange = ExchangeName = #resource{name = ExchangeNameBin},
|
||||
delivery_flow = Flow,
|
||||
conn_name = ConnName,
|
||||
trace_state = TraceState},
|
||||
auth_state = #auth_state{user = #user{username = Username}}} = State) ->
|
||||
|
@ -1564,7 +1571,7 @@ publish_to_queues(
|
|||
QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}),
|
||||
QNames = drop_local(QNames0, State),
|
||||
rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState),
|
||||
Opts = maps_put_truthy(correlation, PacketId, #{}),
|
||||
Opts = maps_put_truthy(flow, Flow, maps_put_truthy(correlation, PacketId, #{})),
|
||||
deliver_to_queues(Msg, Opts, QNames, State);
|
||||
{error, not_found} ->
|
||||
?LOG_ERROR("~s not found", [rabbit_misc:rs(ExchangeName)]),
|
||||
|
@ -2478,6 +2485,7 @@ format_status(
|
|||
published = Published,
|
||||
ssl_login_name = SSLLoginName,
|
||||
retainer_pid = RetainerPid,
|
||||
delivery_flow = DeliveryFlow,
|
||||
trace_state = TraceState,
|
||||
prefetch = Prefetch,
|
||||
client_id = ClientID,
|
||||
|
@ -2499,6 +2507,7 @@ format_status(
|
|||
ssl_login_name => SSLLoginName,
|
||||
retainer_pid => RetainerPid,
|
||||
|
||||
delivery_flow => DeliveryFlow,
|
||||
trace_state => TraceState,
|
||||
prefetch => Prefetch,
|
||||
client_id => ClientID,
|
||||
|
|
|
@ -131,6 +131,7 @@ cluster_size_3_tests() ->
|
|||
pubsub,
|
||||
queue_down_qos1,
|
||||
consuming_classic_queue_down,
|
||||
flow_classic_queue,
|
||||
flow_quorum_queue,
|
||||
flow_stream,
|
||||
rabbit_mqtt_qos0_queue,
|
||||
|
@ -486,6 +487,24 @@ publish_to_all_non_deprecated_queue_types(Config, QoS) ->
|
|||
?awaitMatch([],
|
||||
all_connection_pids(Config), 10_000, 1000).
|
||||
|
||||
%% This test case does not require multiple nodes
|
||||
%% but it is grouped together with flow test cases for other queue types
|
||||
%% (and historically used to use a mirrored classic queue on multiple nodes)
|
||||
flow_classic_queue(Config) ->
|
||||
%% New nodes lookup via persistent_term:get/1 (since 4.0.0)
|
||||
%% Old nodes lookup via application:get_env/2. (that is taken care of by flow/3)
|
||||
%% Therefore, we set both persistent_term and application.
|
||||
Key = credit_flow_default_credit,
|
||||
Val = {2, 1},
|
||||
DefaultVal = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),
|
||||
Result = rpc_all(Config, persistent_term, put, [Key, Val]),
|
||||
?assert(lists:all(fun(R) -> R =:= ok end, Result)),
|
||||
|
||||
flow(Config, {rabbit, Key, Val}, <<"classic">>),
|
||||
|
||||
?assertEqual(Result, rpc_all(Config, persistent_term, put, [Key, DefaultVal])),
|
||||
ok.
|
||||
|
||||
flow_quorum_queue(Config) ->
|
||||
flow(Config, {rabbit, quorum_commands_soft_limit, 1}, <<"quorum">>).
|
||||
|
||||
|
|
Loading…
Reference in New Issue