From 40bf778e89f6830582286a46db4ad3f9415bd6f4 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 8 Nov 2024 10:34:49 +0100 Subject: [PATCH] Fix MQTT test flake Prior to this commit, test ``` make -C deps/rabbitmq_mqtt ct-mqtt_shared t=[mqtt,cluster_size_1,v4]:non_clean_sess_reconnect_qos0_and_qos1 ``` flaked in CI with error: ``` {mqtt_shared_SUITE,non_clean_sess_reconnect_qos0_and_qos1,972} {badmatch,{publish_not_received,<<"msg-0">>}} ``` The problem was the following race condition: * The MQTT v4 client sends an async DISCONNECT * The global MQTT consumer metric got decremented. However, the classic queue still has the MQTT connection proc registered as consumer. * The test case sends a message * The classic queue checks out the message to the old connection instead of checking out the message to the new connection. The solution in this commit is to check the consumer count of the classic queue before proceeding to send the message after disconnection. --- deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 5af808e997..b5c152b6ea 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -894,25 +894,22 @@ session_expiry(Config) -> ok = rpc(Config, application, set_env, [App, Par, DefaultVal]). non_clean_sess_reconnect_qos1(Config) -> - non_clean_sess_reconnect(Config, qos1). + non_clean_sess_reconnect(Config, 1). non_clean_sess_reconnect_qos0(Config) -> - non_clean_sess_reconnect(Config, qos0). + non_clean_sess_reconnect(Config, 0). non_clean_sess_reconnect(Config, SubscriptionQoS) -> Pub = connect(<<"publisher">>, Config), Topic = ClientId = atom_to_binary(?FUNCTION_NAME), C1 = connect(ClientId, Config, non_clean_sess_opts()), - {ok, _, _} = emqtt:subscribe(C1, Topic, SubscriptionQoS), - ?assertMatch(#{consumers := 1}, - get_global_counters(Config)), + {ok, _, [SubscriptionQoS]} = emqtt:subscribe(C1, Topic, SubscriptionQoS), + ok = await_consumer_count(1, ClientId, SubscriptionQoS, Config), ok = emqtt:disconnect(C1), - eventually(?_assertMatch(#{consumers := 0}, - get_global_counters(Config))), + ok = await_consumer_count(0, ClientId, SubscriptionQoS, Config), - timer:sleep(20), ok = emqtt:publish(Pub, Topic, <<"msg-3-qos0">>, qos0), {ok, _} = emqtt:publish(Pub, Topic, <<"msg-4-qos1">>, qos1), @@ -920,8 +917,7 @@ non_clean_sess_reconnect(Config, SubscriptionQoS) -> %% Server should reply in CONNACK that it has session state. ?assertEqual({session_present, 1}, proplists:lookup(session_present, emqtt:info(C2))), - ?assertMatch(#{consumers := 1}, - get_global_counters(Config)), + ok = await_consumer_count(1, ClientId, SubscriptionQoS, Config), ok = emqtt:publish(Pub, Topic, <<"msg-5-qos0">>, qos0), {ok, _} = emqtt:publish(Pub, Topic, <<"msg-6-qos1">>, qos1), @@ -954,21 +950,20 @@ non_clean_sess_reconnect_qos0_and_qos1(Config) -> ClientId = ?FUNCTION_NAME, C1 = connect(ClientId, Config, non_clean_sess_opts()), - {ok, _, [1, 0]} = emqtt:subscribe(C1, [{Topic1, qos1}, {Topic0, qos0}]), - ?assertMatch(#{consumers := 1}, - get_global_counters(Config)), + {ok, _, [1, 0]} = emqtt:subscribe(C1, [{Topic1, qos1}, + {Topic0, qos0}]), + ok = await_consumer_count(1, ClientId, 0, Config), + ok = await_consumer_count(1, ClientId, 1, Config), ok = emqtt:disconnect(C1), - eventually(?_assertMatch(#{consumers := 0}, - get_global_counters(Config))), - + ok = await_consumer_count(0, ClientId, 0, Config), + ok = await_consumer_count(0, ClientId, 1, Config), {ok, _} = emqtt:publish(Pub, Topic0, <<"msg-0">>, qos1), {ok, _} = emqtt:publish(Pub, Topic1, <<"msg-1">>, qos1), C2 = connect(ClientId, Config, non_clean_sess_opts()), - ?assertMatch(#{consumers := 1}, - get_global_counters(Config)), - + ok = await_consumer_count(1, ClientId, 0, Config), + ok = await_consumer_count(1, ClientId, 1, Config), ok = expect_publishes(C2, Topic0, [<<"msg-0">>]), ok = expect_publishes(C2, Topic1, [<<"msg-1">>]), @@ -1884,6 +1879,17 @@ await_confirms_unordered(From, Left) -> ct:fail("~b confirms are missing", [Left]) end. +await_consumer_count(ConsumerCount, ClientId, QoS, Config) -> + Ch = rabbit_ct_client_helpers:open_channel(Config), + QueueName = rabbit_mqtt_util:queue_name_bin( + rabbit_data_coercion:to_binary(ClientId), QoS), + eventually( + ?_assertMatch( + #'queue.declare_ok'{consumer_count = ConsumerCount}, + amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, + passive = true})), 500, 10), + ok = rabbit_ct_client_helpers:close_channel(Ch). + declare_queue(Ch, QueueName, Args) when is_pid(Ch), is_binary(QueueName), is_list(Args) -> #'queue.declare_ok'{} = amqp_channel:call(