diff --git a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl index 430e756de8..7b2a0fb60a 100644 --- a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl +++ b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl @@ -42,7 +42,6 @@ clean_sess :: boolean(), will_msg, exchange :: rabbit_exchange:name(), - has_subs = false :: boolean(), has_published = false :: boolean(), ssl_login_name, %% Retained messages handler. See rabbit_mqtt_retainer_sup diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 679024ddae..6a0a5f1839 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -164,6 +164,7 @@ process_request(?SUBSCRIBE, #proc_state{send_fun = SendFun, retainer_pid = RPid} = PState0) -> rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]), + HasSubsBefore = has_subs(PState0), {QosResponse, PState1} = lists:foldl(fun(_Topic, {[?SUBACK_FAILURE | _] = L, S}) -> %% Once a subscription failed, mark all following subscriptions @@ -185,9 +186,7 @@ process_request(?SUBSCRIBE, %% for the same queue case consume(Q, QoS, S1) of {ok, S2} -> - S3 = S2#proc_state{has_subs = true}, - maybe_increment_consumer(S3, S2), - {[QoS | L], S3}; + {[QoS | L], S2}; {error, _Reason} -> {[?SUBACK_FAILURE | L], S1} end; @@ -200,6 +199,7 @@ process_request(?SUBSCRIBE, {[?SUBACK_FAILURE | L], S0} end end, {[], PState0}, Topics), + maybe_increment_consumer(HasSubsBefore, PState1), SendFun( #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, variable = #mqtt_frame_suback{ @@ -223,6 +223,7 @@ process_request(?UNSUBSCRIBE, payload = undefined}, PState0 = #proc_state{send_fun = SendFun}) -> rabbit_log_connection:debug("Received an UNSUBSCRIBE for topic(s) ~p", [Topics]), + HasSubsBefore = has_subs(PState0), PState = lists:foldl( fun(#mqtt_topic{name = TopicName}, #proc_state{} = S0) -> case find_queue_name(TopicName, S0) of @@ -244,20 +245,8 @@ process_request(?UNSUBSCRIBE, variable = #mqtt_frame_suback{message_id = MessageId}}, PState), - PState3 = case rabbit_binding:list_for_destination(queue_name(?QOS_0, PState)) of - [] -> - case rabbit_binding:list_for_destination(queue_name(?QOS_1, PState)) of - [] -> - PState2 = #proc_state{has_subs = false}, - maybe_decrement_consumer(PState, PState2), - PState2; - _ -> - PState - end; - _ -> - PState - end, - {ok, PState3}; + maybe_decrement_consumer(HasSubsBefore, PState), + {ok, PState}; process_request(?PINGREQ, #mqtt_frame{}, PState = #proc_state{send_fun = SendFun}) -> rabbit_log_connection:debug("Received a PINGREQ"), @@ -462,7 +451,7 @@ start_keepalive(#mqtt_frame_connect{keep_alive = Seconds}, #proc_state{socket = Socket}) -> ok = rabbit_mqtt_keepalive:start(Seconds, Socket). -handle_clean_session(_, PState0 = #proc_state{clean_sess = false}) -> +handle_clean_session(_, PState0 = #proc_state{clean_sess = false, proto_ver = ProtoVer}) -> case get_queue(?QOS_1, PState0) of {error, not_found} -> %% Queue will be created later when client subscribes. @@ -470,6 +459,7 @@ handle_clean_session(_, PState0 = #proc_state{clean_sess = false}) -> {ok, Q} -> case consume(Q, ?QOS_1, PState0) of {ok, PState} -> + rabbit_global_counters:consumer_created(ProtoVer), {ok, _SessionPresent = true, PState}; {error, access_refused} -> {error, ?CONNACK_NOT_AUTHORIZED}; @@ -539,11 +529,22 @@ find_queue_name(TopicName, #proc_state{exchange = Exchange, end. lookup_binding(Exchange, QueueName, RoutingKey) -> - B= #binding{source = Exchange, + B = #binding{source = Exchange, destination = QueueName, key = RoutingKey}, lists:member(B, rabbit_binding:list_for_source_and_destination(Exchange, QueueName)). +has_subs(#proc_state{exchange = Exchange} = PState) -> + has_subs_between(Exchange, queue_name(?QOS_0, PState)) orelse + has_subs_between(Exchange, queue_name(?QOS_1, PState)). +has_subs_between(Exchange, QueueName) -> + case rabbit_binding:list_for_source_and_destination(Exchange, QueueName) of + [] -> + false; + _ -> + true + end. + hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, #mqtt_msg{payload = <<"">>}) -> Topic1 = Amqp2MqttFun(Topic0), rabbit_mqtt_retainer:clear(RetainerPid, Topic1), @@ -1649,7 +1650,6 @@ ssl_login_name(Sock) -> format_status(#proc_state{queue_states = QState, proto_ver = ProtoVersion, - has_subs = HasSubs, unacked_client_pubs = UnackClientPubs, unacked_server_pubs = UnackSerPubs, packet_id = PackID, @@ -1667,7 +1667,6 @@ format_status(#proc_state{queue_states = QState, } = PState) -> #{queue_states => rabbit_queue_type:format_status(QState), proto_ver => ProtoVersion, - has_subs => HasSubs, unacked_client_pubs => UnackClientPubs, unacked_server_pubs => UnackSerPubs, packet_id => PackID, @@ -1706,21 +1705,35 @@ maybe_decrement_publisher(_) -> ok. %% multiple subscriptions from the same connection count as one consumer -maybe_increment_consumer(#proc_state{has_subs = true, proto_ver = ProtoVer}, - #proc_state{has_subs = false}) -> - rabbit_global_counters:consumer_created(ProtoVer); +maybe_increment_consumer(false, #proc_state{proto_ver = ProtoVer} = PState) -> + case has_subs(PState) of + true -> + rabbit_global_counters:consumer_created(ProtoVer); + false -> + ok + end; maybe_increment_consumer(_, _) -> ok. -maybe_decrement_consumer(#proc_state{has_subs = true, - proto_ver = ProtoVer}) -> - rabbit_global_counters:consumer_deleted(ProtoVer); -maybe_decrement_consumer(_) -> - ok. - -% when there were subscriptions but not anymore -maybe_decrement_consumer(#proc_state{has_subs = false, proto_ver = ProtoVer}, - #proc_state{has_subs = true}) -> - rabbit_global_counters:consumer_deleted(ProtoVer); +%% when there were subscriptions but not anymore +maybe_decrement_consumer(true, #proc_state{proto_ver = ProtoVer} = PState) -> + case has_subs(PState) of + false -> + rabbit_global_counters:consumer_deleted(ProtoVer); + true -> + ok + end; maybe_decrement_consumer(_, _) -> ok. + +%% used when connection is terminated +maybe_decrement_consumer(#proc_state{proto_ver = ProtoVer, + auth_state = #auth_state{vhost = _Vhost}} = PState) -> + case has_subs(PState) of + true -> + rabbit_global_counters:consumer_deleted(ProtoVer); + false -> + ok + end; +maybe_decrement_consumer(_) -> + ok. diff --git a/deps/rabbitmq_mqtt/test/integration_SUITE.erl b/deps/rabbitmq_mqtt/test/integration_SUITE.erl index a017f33931..a10f9057cb 100644 --- a/deps/rabbitmq_mqtt/test/integration_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/integration_SUITE.erl @@ -483,9 +483,8 @@ consuming_classic_queue_down(Config) -> {ok, C2} = emqtt:start_link([{clean_start, false} | Options]), {ok, _} = emqtt:connect(C2), - %%TODO uncomment below 2 lines once consumers counter works for clean_sess = false - % ?assertMatch(#{consumers := 1}, - % get_global_counters(Config, ProtoVer, Server3)), + ?assertMatch(#{consumers := 1}, + get_global_counters(Config, ProtoVer, Server3)), %% Let's stop the queue leader node. process_flag(trap_exit, true), @@ -574,12 +573,21 @@ non_clean_sess_disconnect(Config) -> {C1, _} = connect(?FUNCTION_NAME, Config, [{clean_start, false}]), Topic = <<"test-topic1">>, {ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1), + ?assertMatch(#{consumers := 1}, + get_global_counters(Config, v4)), + ok = emqtt:disconnect(C1), + ?assertMatch(#{consumers := 0}, + get_global_counters(Config, v4)), {C2, _} = connect(?FUNCTION_NAME, Config, [{clean_start, false}]), + ?assertMatch(#{consumers := 1}, + get_global_counters(Config, v4)), %% shouldn't receive message after unsubscribe {ok, _, _} = emqtt:unsubscribe(C2, Topic), + ?assertMatch(#{consumers := 0}, + get_global_counters(Config, v4)), Msg = <<"msg">>, {ok, _} = emqtt:publish(C2, Topic, Msg, qos1), {publish_not_received, Msg} = expect_publishes(Topic, [Msg]),