diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl index 2b64850b13..cddc39e3c3 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl @@ -58,13 +58,13 @@ adapter_name(State) -> PeerAddr :: inet:ip_address(). -type process_frame_result() :: - {ok, #proc_state{}} | + {ok, term(), #proc_state{}} | {stop, term(), #proc_state{}}. -spec process_frame(#stomp_frame{}, #proc_state{}) -> process_frame_result(). --spec flush_and_die(#proc_state{}) -> ok. +-spec flush_and_die(#proc_state{}) -> #proc_state{}. -spec command({Command, Frame}, State) -> process_frame_result() when Command :: string(), @@ -212,7 +212,7 @@ handle_exit(Conn, {shutdown, {connection_closing, State = #proc_state{connection = Conn}) -> amqp_death(Code, Explanation, State); handle_exit(Conn, Reason, State = #proc_state{connection = Conn}) -> - send_error("AMQP connection died", "Reason: ~p", [Reason], State), + _ = send_error("AMQP connection died", "Reason: ~p", [Reason], State), {stop, {conn_died, Reason}, State}; handle_exit(Ch, {shutdown, {server_initiated_close, Code, Explanation}}, @@ -220,7 +220,7 @@ handle_exit(Ch, {shutdown, {server_initiated_close, Code, Explanation}}, amqp_death(Code, Explanation, State); handle_exit(Ch, Reason, State = #proc_state{channel = Ch}) -> - send_error("AMQP channel died", "Reason: ~p", [Reason], State), + _ = send_error("AMQP channel died", "Reason: ~p", [Reason], State), {stop, {channel_died, Reason}, State}; handle_exit(Ch, {shutdown, {server_initiated_close, Code, Explanation}}, State = #proc_state{channel = Ch}) -> @@ -248,10 +248,10 @@ process_request(ProcessFun, SuccessFun, State) -> end, case Res of {ok, Frame, NewState = #proc_state{connection = Conn}} -> - case Frame of - none -> ok; - _ -> send_frame(Frame, NewState) - end, + _ = case Frame of + none -> ok; + _ -> send_frame(Frame, NewState) + end, {ok, SuccessFun(NewState), Conn}; {error, Message, Detail, NewState = #proc_state{connection = Conn}} -> {ok, send_error(Message, Detail, NewState), Conn}; @@ -449,14 +449,14 @@ server_cancel_consumer(ConsumerTag, State = #proc_state{subscriptions = Subs}) - {ok, {_, Id1}} -> Id1; {error, {_, Id1}} -> "Unknown[" ++ Id1 ++ "]" end, - send_error_frame("Server cancelled subscription", - [{?HEADER_SUBSCRIPTION, Id}], - "The server has canceled a subscription.~n" - "No more messages will be delivered for ~p.~n", - [Description], - State), + _ = send_error_frame("Server cancelled subscription", + [{?HEADER_SUBSCRIPTION, Id}], + "The server has canceled a subscription.~n" + "No more messages will be delivered for ~p.~n", + [Description], + State), tidy_canceled_subscription(ConsumerTag, Subscription, - #stomp_frame{}, State) + undefined, State) end. cancel_subscription({error, invalid_prefix}, _Frame, State) -> @@ -495,6 +495,15 @@ cancel_subscription({ok, ConsumerTag, Description}, Frame, end end. +%% Server-initiated cancelations will pass an undefined instead of a +%% STOMP frame. In this case we know that the queue was deleted and +%% thus we don't have to clean it up. +tidy_canceled_subscription(ConsumerTag, _Subscription, + undefined, State = #proc_state{subscriptions = Subs}) -> + Subs1 = dict:erase(ConsumerTag, Subs), + ok(State#proc_state{subscriptions = Subs1}); + +%% Client-initiated cancelations will pass an actual frame tidy_canceled_subscription(ConsumerTag, #subscription{dest_hdr = DestHdr}, Frame, State = #proc_state{subscriptions = Subs}) -> Subs1 = maps:remove(ConsumerTag, Subs), @@ -663,8 +672,8 @@ do_subscribe(Destination, DestHdr, Frame, {ok, _} -> Message = "Duplicated subscription identifier", Detail = "A subscription identified by '~s' alredy exists.", - error(Message, Detail, [ConsumerTag], State), - send_error(Message, Detail, [ConsumerTag], State), + _ = error(Message, Detail, [ConsumerTag], State), + _ = send_error(Message, Detail, [ConsumerTag], State), {stop, normal, close_connection(State)}; error -> ExchangeAndKey = @@ -1049,7 +1058,7 @@ ensure_heartbeats(Heartbeats) -> {SendTimeout, ReceiveTimeout} = {millis_to_seconds(CY), millis_to_seconds(CX)}, - rabbit_stomp_reader:start_heartbeats(self(), {SendTimeout, ReceiveTimeout}), + _ = rabbit_stomp_reader:start_heartbeats(self(), {SendTimeout, ReceiveTimeout}), {SendTimeout * 1000 , ReceiveTimeout * 1000}. millis_to_seconds(M) when M =< 0 -> 0; diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl index 3c666f26f5..7a7a4a1e83 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl @@ -52,7 +52,7 @@ start_link(SupHelperPid, Ref, Sock, Configuration) -> %% meaningless synchronous call to the underlying gen_event %% mechanism. When it returns the mailbox is drained, and we %% return to our caller to accept more connections. - gen_event:which_handlers(error_logger), + _ = gen_event:which_handlers(error_logger), {ok, Pid}. @@ -80,7 +80,7 @@ init([SupHelperPid, Ref, Sock, Configuration]) -> [self(), ConnStr]), ParseState = rabbit_stomp_frame:initial_state(), - register_resource_alarm(), + _ = register_resource_alarm(), gen_server2:enter_loop(?MODULE, [], rabbit_event:init_stats_timer( run_socket(control_throttle( @@ -275,15 +275,18 @@ run_socket(State = #reader_state{state = blocked}) -> run_socket(State = #reader_state{recv_outstanding = true}) -> State; run_socket(State = #reader_state{socket = Sock}) -> - rabbit_net:async_recv(Sock, 0, infinity), + _ = rabbit_net:async_recv(Sock, 0, infinity), State#reader_state{recv_outstanding = true}. +terminate(Reason, undefined) -> + log_reason(Reason, undefined), + {stop, Reason}; terminate(Reason, State = #reader_state{ processor_state = ProcState }) -> maybe_emit_stats(State), log_reason(Reason, State), - rabbit_stomp_processor:flush_and_die(ProcState), - ok. + _ = rabbit_stomp_processor:flush_and_die(ProcState), + {stop, Reason}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -331,12 +334,14 @@ log_reason({shutdown, client_heartbeat_timeout}, log_reason(normal, #reader_state{ conn_name = ConnName}) -> log(info, "closing STOMP connection ~p (~s)~n", [self(), ConnName]); +log_reason(shutdown, undefined) -> + log(error, "closing STOMP connection that never completed connection handshake (negotiation)~n", []); + log_reason(Reason, #reader_state{ processor_state = ProcState }) -> AdapterName = rabbit_stomp_processor:adapter_name(ProcState), rabbit_log:warning("STOMP connection ~s terminated" " with reason ~p, closing it~n", [AdapterName, Reason]). - %%---------------------------------------------------------------------------- processor_args(Configuration, Sock) -> @@ -417,8 +422,8 @@ info_internal(SockStat, #reader_state{socket = Sock}) when SockStat =:= recv_oct SockStat =:= send_cnt; SockStat =:= send_pend -> case rabbit_net:getstat(Sock, [SockStat]) of - {ok, [{_, I}]} -> I; - {error, _} -> '' + {ok, [{_, N}]} when is_number(N) -> N; + _ -> 0 end; info_internal(state, State) -> info_internal(connection_state, State); info_internal(garbage_collection, _State) -> diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl index addfb6392a..bf96282b62 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl @@ -383,7 +383,6 @@ subscription_queue_name(Destination, SubscriptionId, Frame) -> %% ---- Helpers ---- split([], _Splitter) -> []; -split(Content, []) -> Content; split(Content, Splitter) -> split(Content, [], [], Splitter). split([], RPart, RParts, _Splitter) ->