From 7e13ade7fad837f09ced7776ee7d2bab4c08f565 Mon Sep 17 00:00:00 2001 From: Steve Powell Date: Tue, 22 May 2012 17:02:33 +0100 Subject: [PATCH] Prevent mainloop from issuing too many async_recv at a time. --- .../src/rabbit_stomp_reader.erl | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl index 28e338259c..38ae62f7fe 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl @@ -25,7 +25,7 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -record(reader_state, {socket, parse_state, processor, state, iterations, - conserve_resources}). + conserve_resources, recv_outstanding}). %%---------------------------------------------------------------------------- @@ -53,7 +53,8 @@ init(SupPid, Configuration) -> processor = ProcessorPid, state = running, iterations = 0, - conserve_resources = false})), 0), + conserve_resources = false, + recv_outstanding = false})), 0), log(info, "closing STOMP connection ~p (~s)~n", [self(), ConnStr]) catch @@ -66,11 +67,11 @@ init(SupPid, Configuration) -> done end. -mainloop(State = #reader_state{socket = Sock}, ByteCount) -> - run_socket(State, ByteCount), +mainloop(State0 = #reader_state{socket = Sock}, ByteCount) -> + State = run_socket(State0, ByteCount), receive {inet_async, Sock, _Ref, {ok, Data}} -> - process_received_bytes(Data, State); + process_received_bytes(Data, State#reader_state{recv_outstanding = false}); {inet_async, _Sock, _Ref, {error, closed}} -> ok; {inet_async, _Sock, _Ref, {error, Reason}} -> @@ -125,11 +126,16 @@ next_state(blocking, #stomp_frame{command = "SEND"}) -> next_state(S, _) -> S. -run_socket(#reader_state{state = blocked}, _ByteCount) -> - ok; -run_socket(#reader_state{socket = Sock}, ByteCount) -> - rabbit_net:async_recv(Sock, ByteCount, infinity), - ok. +run_socket(State = #reader_state{state = blocked}, _ByteCount) -> + State; +run_socket(State = #reader_state{socket = Sock, + recv_outstanding = RO}, + ByteCount) -> + case RO of + false -> rabbit_net:async_recv(Sock, ByteCount, infinity), + State#reader_state{recv_outstanding = true}; + true -> State + end. %%----------------------------------------------------------------------------