From 57ef5ea64fcf6df043a912571b25687a85b5f9b7 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 4 Jun 2025 10:49:19 +0200 Subject: [PATCH 1/4] Delete mqtt qos0 when connection closes --- .../src/rabbit_mqtt_processor.erl | 2 +- deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 15 +++ .../test/web_mqtt_shared_SUITE.erl | 1 + selenium/package.json | 2 +- selenium/test/basic-auth/rabbitmq.conf | 5 +- .../queuesAndStreams/autodelete-mqtt-qos0.js | 111 ++++++++++++++++++ 6 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 selenium/test/queuesAndStreams/autodelete-mqtt-qos0.js diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 2942d8c0d7..6c40c58db4 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1906,7 +1906,7 @@ log_delayed_will_failure(Topic, ClientId, Reason) -> [Topic, ClientId, Reason]). maybe_delete_mqtt_qos0_queue( - State = #state{cfg = #cfg{clean_start = true}, + State = #state{cfg = #cfg{session_expiry_interval_secs = 0}, auth_state = #auth_state{user = #user{username = Username}}}) -> case get_queue(?QOS_0, State) of {ok, Q} -> diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 8bb037d5ef..3854bf520b 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -113,6 +113,7 @@ cluster_size_1_tests() -> ,block ,amqp_to_mqtt_qos0 ,clean_session_disconnect_client + ,zero_session_expiry_interval_disconnect_client ,clean_session_node_restart ,clean_session_node_kill ,rabbit_status_connection_count @@ -211,6 +212,7 @@ init_per_testcase(T, Config) init_per_testcase0(T, Config); init_per_testcase(T, Config) when T =:= clean_session_disconnect_client; + T =:= zero_session_expiry_interval_disconnect_client; T =:= clean_session_node_restart; T =:= clean_session_node_kill; T =:= notify_consumer_qos0_queue_deleted -> @@ -229,6 +231,7 @@ end_per_testcase(T, Config) end_per_testcase0(T, Config); end_per_testcase(T, Config) when T =:= clean_session_disconnect_client; + T =:= zero_session_expiry_interval_disconnect_client; T =:= clean_session_node_restart; T =:= clean_session_node_kill; T =:= notify_consumer_qos0_queue_deleted -> @@ -1583,6 +1586,18 @@ clean_session_disconnect_client(Config) -> L = rpc(Config, rabbit_amqqueue, list, []), ?assertEqual(0, length(L)). +zero_session_expiry_interval_disconnect_client(Config) -> + C = connect(?FUNCTION_NAME, Config, [{properties, #{'Session-Expiry-Interval' => 0}}]), + {ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0), + QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]), + ?assertEqual(1, length(QsQos0)), + + ok = emqtt:disconnect(C), + %% After terminating a clean session, we expect any session state to be cleaned up on the server. + timer:sleep(200), %% Give some time to clean up exclusive classic queue. + L = rpc(Config, rabbit_amqqueue, list, []), + ?assertEqual(0, length(L)). + clean_session_node_restart(Config) -> clean_session_node_down(stop_node, Config). diff --git a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl index e2b3f00672..427e651379 100644 --- a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl @@ -103,3 +103,4 @@ maintenance(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). notify_consumer_quorum_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). notify_consumer_qos0_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). +zero_session_expiry_interval_disconnect_client(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). \ No newline at end of file diff --git a/selenium/package.json b/selenium/package.json index c84f5668ff..c79d91274d 100644 --- a/selenium/package.json +++ b/selenium/package.json @@ -12,7 +12,7 @@ "author": "", "license": "ISC", "dependencies": { - "chromedriver": "^135.0", + "chromedriver": "^137.0", "ejs": "^3.1.8", "express": "^4.18.2", "geckodriver": "^3.0.2", diff --git a/selenium/test/basic-auth/rabbitmq.conf b/selenium/test/basic-auth/rabbitmq.conf index 7bacc14af2..8bdbec84dd 100644 --- a/selenium/test/basic-auth/rabbitmq.conf +++ b/selenium/test/basic-auth/rabbitmq.conf @@ -1,6 +1,9 @@ auth_backends.1 = rabbit_auth_backend_internal -management.login_session_timeout = 1 load_definitions = ${IMPORT_DIR}/users.json +management.login_session_timeout = 1 + loopback_users = none + +log.console.level = debug diff --git a/selenium/test/queuesAndStreams/autodelete-mqtt-qos0.js b/selenium/test/queuesAndStreams/autodelete-mqtt-qos0.js new file mode 100644 index 0000000000..1e90f82d02 --- /dev/null +++ b/selenium/test/queuesAndStreams/autodelete-mqtt-qos0.js @@ -0,0 +1,111 @@ +const { By, Key, until, Builder } = require('selenium-webdriver') +require('chromedriver') +const assert = require('assert') +const { buildDriver, goToHome, goToQueue, captureScreensFor, teardown, doUntil, findTableRow } = require('../utils') +const { createQueue, getManagementUrl, basicAuthorization } = require('../mgt-api') +const { openConnection, getConnectionOptions } = require('../mqtt') + +const LoginPage = require('../pageobjects/LoginPage') +const OverviewPage = require('../pageobjects/OverviewPage') +const QueuesAndStreamsPage = require('../pageobjects/QueuesAndStreamsPage') +const QueuePage = require('../pageobjects/QueuePage') +const ConnectionsPage = require('../pageobjects/ConnectionsPage'); + + +describe('Given an MQTT 5.0 connection with a qos 0 subscription with zero sessionExpiryInterval', function () { + let login + let queuesAndStreamsPage + let queuePage + let overview + let captureScreen + let queueName + + let mqttClient + + before(async function () { + driver = buildDriver() + await goToHome(driver) + login = new LoginPage(driver) + overview = new OverviewPage(driver) + queuePage = new QueuePage(driver) + connectionsPage = new ConnectionsPage(driver) + queuesAndStreamsPage = new QueuesAndStreamsPage(driver) + captureScreen = captureScreensFor(driver, __filename) + + await login.login('management', 'guest') + if (!await overview.isLoaded()) { + throw new Error('Failed to login') + } + //await overview.selectRefreshOption("Do not refresh") + + queueName = "test_" + Math.floor(Math.random() * 1000) + createQueue(getManagementUrl(), basicAuthorization("management", "guest"), + "/", queueName, { + "x-queue-type": "quorum" + }) + + mqttClient = openConnection(getConnectionOptions()) + let subscribed = new Promise((resolve, reject) => { + mqttClient.on('error', function(err) { + reject(err) + assert.fail("Mqtt connection failed due to " + err) + }), + mqttClient.on('connect', function(err) { + mqttClient.subscribe(queueName, {qos:0}, function (err2) { + if (!err2) { + resolve("ok") + }else { + reject(err2) + } + }) + }) + }) + assert.equal("ok", await subscribed) + + }) + + it('can view mqtt qos0 queue', async function () { + await overview.clickOnQueuesTab() + + let table = await doUntil(function() { + return queuesAndStreamsPage.getQueuesTable() + }, function(t) { + return findTableRow(t, function(row) { + return row[2] === 'rabbit_mqtt_qos0_queue' + }) + }) + let mqttQueueName = findTableRow(table, function(row) { + return row[2] === 'rabbit_mqtt_qos0_queue' + })[1] + + await goToQueue(driver, "/", mqttQueueName) + await queuePage.isLoaded() + + }) + + it('when the connection is closed, the mqtt qos0 queue should be removed', async function () { + + mqttClient.end() + + await overview.clickOnConnectionsTab() + await doUntil(async function() { + return connectionsPage.getPagingSectionHeaderText() + }, function(header) { + return header === "All connections (0)" + }, 6000) + + await overview.clickOnQueuesTab() + await doUntil(function() { + return queuesAndStreamsPage.getQueuesTable() + }, function(table) { + return !findTableRow(table, function(row) { + return row[2] === 'rabbit_mqtt_qos0_queue' + }) + }) + + }) + + after(async function () { + await teardown(driver, this, captureScreen) + }) +}) From bc7a8be85a9651c4adc4089556e87698363b7957 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 4 Jun 2025 16:14:46 +0200 Subject: [PATCH 2/4] Move test to v5 because it is a feature exclusive of v5 --- deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 15 +---------- deps/rabbitmq_mqtt/test/v5_SUITE.erl | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 3854bf520b..c06ecf4462 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -112,8 +112,7 @@ cluster_size_1_tests() -> ,keepalive_turned_off ,block ,amqp_to_mqtt_qos0 - ,clean_session_disconnect_client - ,zero_session_expiry_interval_disconnect_client + ,clean_session_disconnect_client ,clean_session_node_restart ,clean_session_node_kill ,rabbit_status_connection_count @@ -1586,18 +1585,6 @@ clean_session_disconnect_client(Config) -> L = rpc(Config, rabbit_amqqueue, list, []), ?assertEqual(0, length(L)). -zero_session_expiry_interval_disconnect_client(Config) -> - C = connect(?FUNCTION_NAME, Config, [{properties, #{'Session-Expiry-Interval' => 0}}]), - {ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0), - QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]), - ?assertEqual(1, length(QsQos0)), - - ok = emqtt:disconnect(C), - %% After terminating a clean session, we expect any session state to be cleaned up on the server. - timer:sleep(200), %% Give some time to clean up exclusive classic queue. - L = rpc(Config, rabbit_amqqueue, list, []), - ?assertEqual(0, length(L)). - clean_session_node_restart(Config) -> clean_session_node_down(stop_node, Config). diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 44a1950944..191a49a5fd 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -71,6 +71,7 @@ cluster_size_1_tests() -> session_expiry_reconnect_non_zero, session_expiry_reconnect_zero, session_expiry_reconnect_infinity_to_zero, + zero_session_expiry_disconnect_autodeletes_qos0_queue, client_publish_qos2, client_rejects_publish, client_receive_maximum_min, @@ -188,6 +189,12 @@ init_per_testcase(T, Config) ok = rpc(Config, application, set_env, [?APP, Par, infinity]), Config1 = rabbit_ct_helpers:set_config(Config, {Par, Default}), init_per_testcase0(T, Config1); + +init_per_testcase(T, Config) + when T =:= zero_session_expiry_disconnect_autodeletes_qos0_queue -> + rpc(Config, rabbit_registry, register, [queue, <<"qos0">>, rabbit_mqtt_qos0_queue]), + init_per_testcase0(T, Config); + init_per_testcase(T, Config) -> init_per_testcase0(T, Config). @@ -202,6 +209,11 @@ end_per_testcase(T, Config) Default = ?config(Par, Config), ok = rpc(Config, application, set_env, [?APP, Par, Default]), end_per_testcase0(T, Config); +end_per_testcase(T, Config) + when T =:= zero_session_expiry_disconnect_autodeletes_qos0_queue -> + ok = rpc(Config, rabbit_registry, unregister, [queue, <<"qos0">>]), + init_per_testcase0(T, Config); + end_per_testcase(T, Config) -> end_per_testcase0(T, Config). @@ -389,6 +401,21 @@ session_expiry_quorum_queue_disconnect_decrease(Config) -> ok = session_expiry_disconnect_decrease(rabbit_quorum_queue, Config), ok = rpc(Config, application, unset_env, [?APP, durable_queue_type]). +zero_session_expiry_disconnect_autodeletes_qos0_queue(Config) -> + ClientId = ?FUNCTION_NAME, + C = connect(ClientId, Config, [ + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 0}}]), + {ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0), + QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]), + ?assertEqual(1, length(QsQos0)), + + ok = emqtt:disconnect(C), + %% After terminating a clean session, we expect any session state to be cleaned up on the server. + timer:sleep(200), %% Give some time to clean up exclusive classic queue. + L = rpc(Config, rabbit_amqqueue, list, []), + ?assertEqual(0, length(L)). + session_expiry_disconnect_decrease(QueueType, Config) -> ClientId = ?FUNCTION_NAME, C1 = connect(ClientId, Config, [{properties, #{'Session-Expiry-Interval' => 100}}]), From 69baf91df6921a98f5fe2d91fb7495dd85ae6077 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 4 Jun 2025 18:30:31 +0400 Subject: [PATCH 3/4] MQTT: correct a comment in v5_SUITE #14006 --- deps/rabbitmq_mqtt/test/v5_SUITE.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 191a49a5fd..d0cff4eda2 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -412,7 +412,8 @@ zero_session_expiry_disconnect_autodeletes_qos0_queue(Config) -> ok = emqtt:disconnect(C), %% After terminating a clean session, we expect any session state to be cleaned up on the server. - timer:sleep(200), %% Give some time to clean up exclusive classic queue. + %% Give the node some time to clean up the MQTT QoS 0 queue. + timer:sleep(200), L = rpc(Config, rabbit_amqqueue, list, []), ?assertEqual(0, length(L)). From ae9e1953fccf92075d69027c30b1e3232390ad8d Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 4 Jun 2025 18:48:47 +0400 Subject: [PATCH 4/4] Trailing whitespace --- deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index c06ecf4462..c574c08a27 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -112,7 +112,7 @@ cluster_size_1_tests() -> ,keepalive_turned_off ,block ,amqp_to_mqtt_qos0 - ,clean_session_disconnect_client + ,clean_session_disconnect_client ,clean_session_node_restart ,clean_session_node_kill ,rabbit_status_connection_count