Fix MQTT test flake

Prior to this commit, test
```
make -C deps/rabbitmq_mqtt ct-mqtt_shared t=[mqtt,cluster_size_1,v4]:non_clean_sess_reconnect_qos0_and_qos1
```

flaked in CI with error:
```
{mqtt_shared_SUITE,non_clean_sess_reconnect_qos0_and_qos1,972}
{badmatch,{publish_not_received,<<"msg-0">>}}
```

The problem was the following race condition:
* The MQTT v4 client sends an async DISCONNECT
* The global MQTT consumer metric got decremented. However, the classic
  queue still has the MQTT connection proc registered as consumer.
* The test case sends a message
* The classic queue checks out the message to the old connection instead
  of checking out the message to the new connection.

The solution in this commit is to check the consumer count of the
classic queue before proceeding to send the message after disconnection.
This commit is contained in:
David Ansari 2024-11-08 10:34:49 +01:00
parent 1872ce981a
commit 40bf778e89
1 changed files with 25 additions and 19 deletions

View File

@ -894,25 +894,22 @@ session_expiry(Config) ->
ok = rpc(Config, application, set_env, [App, Par, DefaultVal]).
non_clean_sess_reconnect_qos1(Config) ->
non_clean_sess_reconnect(Config, qos1).
non_clean_sess_reconnect(Config, 1).
non_clean_sess_reconnect_qos0(Config) ->
non_clean_sess_reconnect(Config, qos0).
non_clean_sess_reconnect(Config, 0).
non_clean_sess_reconnect(Config, SubscriptionQoS) ->
Pub = connect(<<"publisher">>, Config),
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
C1 = connect(ClientId, Config, non_clean_sess_opts()),
{ok, _, _} = emqtt:subscribe(C1, Topic, SubscriptionQoS),
?assertMatch(#{consumers := 1},
get_global_counters(Config)),
{ok, _, [SubscriptionQoS]} = emqtt:subscribe(C1, Topic, SubscriptionQoS),
ok = await_consumer_count(1, ClientId, SubscriptionQoS, Config),
ok = emqtt:disconnect(C1),
eventually(?_assertMatch(#{consumers := 0},
get_global_counters(Config))),
ok = await_consumer_count(0, ClientId, SubscriptionQoS, Config),
timer:sleep(20),
ok = emqtt:publish(Pub, Topic, <<"msg-3-qos0">>, qos0),
{ok, _} = emqtt:publish(Pub, Topic, <<"msg-4-qos1">>, qos1),
@ -920,8 +917,7 @@ non_clean_sess_reconnect(Config, SubscriptionQoS) ->
%% Server should reply in CONNACK that it has session state.
?assertEqual({session_present, 1},
proplists:lookup(session_present, emqtt:info(C2))),
?assertMatch(#{consumers := 1},
get_global_counters(Config)),
ok = await_consumer_count(1, ClientId, SubscriptionQoS, Config),
ok = emqtt:publish(Pub, Topic, <<"msg-5-qos0">>, qos0),
{ok, _} = emqtt:publish(Pub, Topic, <<"msg-6-qos1">>, qos1),
@ -954,21 +950,20 @@ non_clean_sess_reconnect_qos0_and_qos1(Config) ->
ClientId = ?FUNCTION_NAME,
C1 = connect(ClientId, Config, non_clean_sess_opts()),
{ok, _, [1, 0]} = emqtt:subscribe(C1, [{Topic1, qos1}, {Topic0, qos0}]),
?assertMatch(#{consumers := 1},
get_global_counters(Config)),
{ok, _, [1, 0]} = emqtt:subscribe(C1, [{Topic1, qos1},
{Topic0, qos0}]),
ok = await_consumer_count(1, ClientId, 0, Config),
ok = await_consumer_count(1, ClientId, 1, Config),
ok = emqtt:disconnect(C1),
eventually(?_assertMatch(#{consumers := 0},
get_global_counters(Config))),
ok = await_consumer_count(0, ClientId, 0, Config),
ok = await_consumer_count(0, ClientId, 1, Config),
{ok, _} = emqtt:publish(Pub, Topic0, <<"msg-0">>, qos1),
{ok, _} = emqtt:publish(Pub, Topic1, <<"msg-1">>, qos1),
C2 = connect(ClientId, Config, non_clean_sess_opts()),
?assertMatch(#{consumers := 1},
get_global_counters(Config)),
ok = await_consumer_count(1, ClientId, 0, Config),
ok = await_consumer_count(1, ClientId, 1, Config),
ok = expect_publishes(C2, Topic0, [<<"msg-0">>]),
ok = expect_publishes(C2, Topic1, [<<"msg-1">>]),
@ -1884,6 +1879,17 @@ await_confirms_unordered(From, Left) ->
ct:fail("~b confirms are missing", [Left])
end.
await_consumer_count(ConsumerCount, ClientId, QoS, Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config),
QueueName = rabbit_mqtt_util:queue_name_bin(
rabbit_data_coercion:to_binary(ClientId), QoS),
eventually(
?_assertMatch(
#'queue.declare_ok'{consumer_count = ConsumerCount},
amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
passive = true})), 500, 10),
ok = rabbit_ct_client_helpers:close_channel(Ch).
declare_queue(Ch, QueueName, Args)
when is_pid(Ch), is_binary(QueueName), is_list(Args) ->
#'queue.declare_ok'{} = amqp_channel:call(