Receive many messages from classic queue

Before this commit, a consumer from a classic queue was receiving max
200 messages:
bb5d6263c9/deps/rabbit/src/rabbit_queue_consumers.erl (L24)

MQTT consumer process must give credit to classic queue process
due to internal flow control.
This commit is contained in:
David Ansari 2022-09-20 08:39:16 +00:00
parent 99337b84d3
commit cdd253ee87
2 changed files with 19 additions and 11 deletions

View File

@ -1234,15 +1234,13 @@ deliver_to_queues(Delivery = #delivery{message = _Message = #basic_message{ex
PState#proc_state{queue_states = QueueStates}.
serialise_and_send_to_client(Frame, #proc_state{proto_ver = ProtoVer, socket = Sock }) ->
%%TODO Test sending large frames at high speed: Will we need garbage collection as done
%% in rabbit_writer:maybe_gc_large_msg()?
try rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame, ProtoVer)) of
Res ->
Res
%%TODO Test sending large frames at high speed:
%% Will we need garbage collection as done in rabbit_writer:maybe_gc_large_msg/1?
try rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame, ProtoVer))
catch _:Error ->
rabbit_log_connection:error("MQTT: a socket write failed, the socket might already be closed"),
rabbit_log_connection:debug("Failed to write to socket ~tp, error: ~tp, frame: ~tp",
[Sock, Error, Frame])
rabbit_log_connection:error("MQTT: a socket write failed, the socket might already be closed"),
rabbit_log_connection:debug("Failed to write to socket ~p, error: ~p, frame: ~p",
[Sock, Error, Frame])
end.
terminate(#proc_state{client_id = undefined}) ->
@ -1315,13 +1313,14 @@ handle_deliver(Msgs, PState)
handle_deliver0(Msg, S)
end, PState, Msgs).
handle_deliver0({_QName, _QPid, _MsgId, Redelivered,
handle_deliver0({QName, QPid, _MsgId, Redelivered,
#basic_message{routing_keys = [RoutingKey | _CcRoutes],
content = #content{
properties = #'P_basic'{headers = Headers},
payload_fragments_rev = FragmentsRev}}},
PState = #proc_state{send_fun = SendFun,
amqp2mqtt_fun = Amqp2MqttFun}) ->
amqp2mqtt_fun = Amqp2MqttFun,
queue_states = QStates}) ->
Dup = case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of
undefined -> Redelivered;
{bool, Dup0} -> Redelivered orelse Dup0
@ -1337,6 +1336,15 @@ handle_deliver0({_QName, _QPid, _MsgId, Redelivered,
topic_name = Amqp2MqttFun(RoutingKey)},
payload = Payload},
SendFun(Frame, PState),
{ok, QueueType} = rabbit_queue_type:module(QName, QStates),
case QueueType of
rabbit_classic_queue ->
rabbit_amqqueue:notify_sent(QPid, self());
_ ->
ok
end,
PState.
publish(TopicName, PublishFun,

View File

@ -740,7 +740,7 @@ public class MqttTest implements MqttCallback {
}
@Test public void publishMultiple() throws MqttException, InterruptedException {
int pubCount = 50;
int pubCount = 1000;
for (int subQos=0; subQos <= 2; subQos++){
for (int pubQos=0; pubQos <= 2; pubQos++){
// avoid reusing the client in this test as a shared