Prevent mainloop from issuing too many async_recv at a time.

This commit is contained in:
Steve Powell 2012-05-22 17:02:33 +01:00
parent 524883da5d
commit 7e13ade7fa
1 changed files with 16 additions and 10 deletions

View File

@ -25,7 +25,7 @@
-include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp_client/include/amqp_client.hrl").
-record(reader_state, {socket, parse_state, processor, state, iterations, -record(reader_state, {socket, parse_state, processor, state, iterations,
conserve_resources}). conserve_resources, recv_outstanding}).
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
@ -53,7 +53,8 @@ init(SupPid, Configuration) ->
processor = ProcessorPid, processor = ProcessorPid,
state = running, state = running,
iterations = 0, iterations = 0,
conserve_resources = false})), 0), conserve_resources = false,
recv_outstanding = false})), 0),
log(info, "closing STOMP connection ~p (~s)~n", log(info, "closing STOMP connection ~p (~s)~n",
[self(), ConnStr]) [self(), ConnStr])
catch catch
@ -66,11 +67,11 @@ init(SupPid, Configuration) ->
done done
end. end.
mainloop(State = #reader_state{socket = Sock}, ByteCount) -> mainloop(State0 = #reader_state{socket = Sock}, ByteCount) ->
run_socket(State, ByteCount), State = run_socket(State0, ByteCount),
receive receive
{inet_async, Sock, _Ref, {ok, Data}} -> {inet_async, Sock, _Ref, {ok, Data}} ->
process_received_bytes(Data, State); process_received_bytes(Data, State#reader_state{recv_outstanding = false});
{inet_async, _Sock, _Ref, {error, closed}} -> {inet_async, _Sock, _Ref, {error, closed}} ->
ok; ok;
{inet_async, _Sock, _Ref, {error, Reason}} -> {inet_async, _Sock, _Ref, {error, Reason}} ->
@ -125,11 +126,16 @@ next_state(blocking, #stomp_frame{command = "SEND"}) ->
next_state(S, _) -> next_state(S, _) ->
S. S.
run_socket(#reader_state{state = blocked}, _ByteCount) -> run_socket(State = #reader_state{state = blocked}, _ByteCount) ->
ok; State;
run_socket(#reader_state{socket = Sock}, ByteCount) -> run_socket(State = #reader_state{socket = Sock,
rabbit_net:async_recv(Sock, ByteCount, infinity), recv_outstanding = RO},
ok. ByteCount) ->
case RO of
false -> rabbit_net:async_recv(Sock, ByteCount, infinity),
State#reader_state{recv_outstanding = true};
true -> State
end.
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------