Delete mqtt qos0 when connection closes
This commit is contained in:
parent
f7a238a8f4
commit
57ef5ea64f
|
@ -1906,7 +1906,7 @@ log_delayed_will_failure(Topic, ClientId, Reason) ->
|
|||
[Topic, ClientId, Reason]).
|
||||
|
||||
maybe_delete_mqtt_qos0_queue(
|
||||
State = #state{cfg = #cfg{clean_start = true},
|
||||
State = #state{cfg = #cfg{session_expiry_interval_secs = 0},
|
||||
auth_state = #auth_state{user = #user{username = Username}}}) ->
|
||||
case get_queue(?QOS_0, State) of
|
||||
{ok, Q} ->
|
||||
|
|
|
@ -113,6 +113,7 @@ cluster_size_1_tests() ->
|
|||
,block
|
||||
,amqp_to_mqtt_qos0
|
||||
,clean_session_disconnect_client
|
||||
,zero_session_expiry_interval_disconnect_client
|
||||
,clean_session_node_restart
|
||||
,clean_session_node_kill
|
||||
,rabbit_status_connection_count
|
||||
|
@ -211,6 +212,7 @@ init_per_testcase(T, Config)
|
|||
init_per_testcase0(T, Config);
|
||||
init_per_testcase(T, Config)
|
||||
when T =:= clean_session_disconnect_client;
|
||||
T =:= zero_session_expiry_interval_disconnect_client;
|
||||
T =:= clean_session_node_restart;
|
||||
T =:= clean_session_node_kill;
|
||||
T =:= notify_consumer_qos0_queue_deleted ->
|
||||
|
@ -229,6 +231,7 @@ end_per_testcase(T, Config)
|
|||
end_per_testcase0(T, Config);
|
||||
end_per_testcase(T, Config)
|
||||
when T =:= clean_session_disconnect_client;
|
||||
T =:= zero_session_expiry_interval_disconnect_client;
|
||||
T =:= clean_session_node_restart;
|
||||
T =:= clean_session_node_kill;
|
||||
T =:= notify_consumer_qos0_queue_deleted ->
|
||||
|
@ -1583,6 +1586,18 @@ clean_session_disconnect_client(Config) ->
|
|||
L = rpc(Config, rabbit_amqqueue, list, []),
|
||||
?assertEqual(0, length(L)).
|
||||
|
||||
zero_session_expiry_interval_disconnect_client(Config) ->
|
||||
C = connect(?FUNCTION_NAME, Config, [{properties, #{'Session-Expiry-Interval' => 0}}]),
|
||||
{ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0),
|
||||
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
|
||||
?assertEqual(1, length(QsQos0)),
|
||||
|
||||
ok = emqtt:disconnect(C),
|
||||
%% After terminating a clean session, we expect any session state to be cleaned up on the server.
|
||||
timer:sleep(200), %% Give some time to clean up exclusive classic queue.
|
||||
L = rpc(Config, rabbit_amqqueue, list, []),
|
||||
?assertEqual(0, length(L)).
|
||||
|
||||
clean_session_node_restart(Config) ->
|
||||
clean_session_node_down(stop_node, Config).
|
||||
|
||||
|
|
|
@ -103,3 +103,4 @@ maintenance(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
|
|||
notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
|
||||
notify_consumer_quorum_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
|
||||
notify_consumer_qos0_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
|
||||
zero_session_expiry_interval_disconnect_client(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
|
|
@ -12,7 +12,7 @@
|
|||
"author": "",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"chromedriver": "^135.0",
|
||||
"chromedriver": "^137.0",
|
||||
"ejs": "^3.1.8",
|
||||
"express": "^4.18.2",
|
||||
"geckodriver": "^3.0.2",
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
auth_backends.1 = rabbit_auth_backend_internal
|
||||
|
||||
management.login_session_timeout = 1
|
||||
load_definitions = ${IMPORT_DIR}/users.json
|
||||
|
||||
management.login_session_timeout = 1
|
||||
|
||||
loopback_users = none
|
||||
|
||||
log.console.level = debug
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
const { By, Key, until, Builder } = require('selenium-webdriver')
|
||||
require('chromedriver')
|
||||
const assert = require('assert')
|
||||
const { buildDriver, goToHome, goToQueue, captureScreensFor, teardown, doUntil, findTableRow } = require('../utils')
|
||||
const { createQueue, getManagementUrl, basicAuthorization } = require('../mgt-api')
|
||||
const { openConnection, getConnectionOptions } = require('../mqtt')
|
||||
|
||||
const LoginPage = require('../pageobjects/LoginPage')
|
||||
const OverviewPage = require('../pageobjects/OverviewPage')
|
||||
const QueuesAndStreamsPage = require('../pageobjects/QueuesAndStreamsPage')
|
||||
const QueuePage = require('../pageobjects/QueuePage')
|
||||
const ConnectionsPage = require('../pageobjects/ConnectionsPage');
|
||||
|
||||
|
||||
describe('Given an MQTT 5.0 connection with a qos 0 subscription with zero sessionExpiryInterval', function () {
|
||||
let login
|
||||
let queuesAndStreamsPage
|
||||
let queuePage
|
||||
let overview
|
||||
let captureScreen
|
||||
let queueName
|
||||
|
||||
let mqttClient
|
||||
|
||||
before(async function () {
|
||||
driver = buildDriver()
|
||||
await goToHome(driver)
|
||||
login = new LoginPage(driver)
|
||||
overview = new OverviewPage(driver)
|
||||
queuePage = new QueuePage(driver)
|
||||
connectionsPage = new ConnectionsPage(driver)
|
||||
queuesAndStreamsPage = new QueuesAndStreamsPage(driver)
|
||||
captureScreen = captureScreensFor(driver, __filename)
|
||||
|
||||
await login.login('management', 'guest')
|
||||
if (!await overview.isLoaded()) {
|
||||
throw new Error('Failed to login')
|
||||
}
|
||||
//await overview.selectRefreshOption("Do not refresh")
|
||||
|
||||
queueName = "test_" + Math.floor(Math.random() * 1000)
|
||||
createQueue(getManagementUrl(), basicAuthorization("management", "guest"),
|
||||
"/", queueName, {
|
||||
"x-queue-type": "quorum"
|
||||
})
|
||||
|
||||
mqttClient = openConnection(getConnectionOptions())
|
||||
let subscribed = new Promise((resolve, reject) => {
|
||||
mqttClient.on('error', function(err) {
|
||||
reject(err)
|
||||
assert.fail("Mqtt connection failed due to " + err)
|
||||
}),
|
||||
mqttClient.on('connect', function(err) {
|
||||
mqttClient.subscribe(queueName, {qos:0}, function (err2) {
|
||||
if (!err2) {
|
||||
resolve("ok")
|
||||
}else {
|
||||
reject(err2)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
assert.equal("ok", await subscribed)
|
||||
|
||||
})
|
||||
|
||||
it('can view mqtt qos0 queue', async function () {
|
||||
await overview.clickOnQueuesTab()
|
||||
|
||||
let table = await doUntil(function() {
|
||||
return queuesAndStreamsPage.getQueuesTable()
|
||||
}, function(t) {
|
||||
return findTableRow(t, function(row) {
|
||||
return row[2] === 'rabbit_mqtt_qos0_queue'
|
||||
})
|
||||
})
|
||||
let mqttQueueName = findTableRow(table, function(row) {
|
||||
return row[2] === 'rabbit_mqtt_qos0_queue'
|
||||
})[1]
|
||||
|
||||
await goToQueue(driver, "/", mqttQueueName)
|
||||
await queuePage.isLoaded()
|
||||
|
||||
})
|
||||
|
||||
it('when the connection is closed, the mqtt qos0 queue should be removed', async function () {
|
||||
|
||||
mqttClient.end()
|
||||
|
||||
await overview.clickOnConnectionsTab()
|
||||
await doUntil(async function() {
|
||||
return connectionsPage.getPagingSectionHeaderText()
|
||||
}, function(header) {
|
||||
return header === "All connections (0)"
|
||||
}, 6000)
|
||||
|
||||
await overview.clickOnQueuesTab()
|
||||
await doUntil(function() {
|
||||
return queuesAndStreamsPage.getQueuesTable()
|
||||
}, function(table) {
|
||||
return !findTableRow(table, function(row) {
|
||||
return row[2] === 'rabbit_mqtt_qos0_queue'
|
||||
})
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
after(async function () {
|
||||
await teardown(driver, this, captureScreen)
|
||||
})
|
||||
})
|
Loading…
Reference in New Issue