Fix issue around rendering a mqtt qos0 queue
This commit is contained in:
parent
8e78c102c6
commit
cc86ffe30a
|
@ -1762,7 +1762,14 @@ function is_internal(queue) {
|
|||
}
|
||||
|
||||
function get_queue_type (queue) {
|
||||
return queue.type;
|
||||
switch(queue.type) {
|
||||
case "classic":
|
||||
case "quorum":
|
||||
case "stream":
|
||||
return queue.type;
|
||||
default:
|
||||
return "default"
|
||||
}
|
||||
}
|
||||
|
||||
function is_quorum(queue) {
|
||||
|
|
|
@ -14,6 +14,8 @@ authnz-mgt/oauth-with-keycloak.sh
|
|||
authnz-mgt/oauth-with-keycloak-with-verify-none.sh
|
||||
authnz-mgt/oauth-with-uaa-down-but-with-basic-auth.sh
|
||||
authnz-mgt/oauth-with-uaa-down.sh
|
||||
mgt/amqp10-connections.sh
|
||||
mgt/mqtt-connections.sh
|
||||
mgt/vhosts.sh
|
||||
mgt/definitions.sh
|
||||
mgt/exchanges.sh
|
||||
|
|
|
@ -8,3 +8,4 @@ mgt/exchanges.sh
|
|||
mgt/queuesAndStreams.sh
|
||||
mgt/limits.sh
|
||||
mgt/amqp10-connections.sh
|
||||
mgt/mqtt-connections.sh
|
|
@ -0,0 +1,9 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
SCRIPT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
|
||||
TEST_CASES_PATH=/connections/mqtt
|
||||
TEST_CONFIG_PATH=/basic-auth
|
||||
|
||||
source $SCRIPT/../../bin/suite_template $@
|
||||
run
|
|
@ -1,2 +1,3 @@
|
|||
[rabbitmq_management,rabbitmq_stream,rabbitmq_stream_common,rabbitmq_stream_management,
|
||||
rabbitmq_top,rabbitmq_tracing,rabbitmq_federation_management,rabbitmq_shovel_management].
|
||||
rabbitmq_top,rabbitmq_tracing,rabbitmq_federation_management,rabbitmq_shovel_management,
|
||||
rabbitmq_mqtt].
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
const { By, Key, until, Builder } = require('selenium-webdriver')
|
||||
require('chromedriver')
|
||||
const assert = require('assert')
|
||||
const { buildDriver, goToHome, captureScreensFor, teardown, doUntil } = require('../../utils')
|
||||
const { openConnection, getConnectionOptions } = require('../../mqtt')
|
||||
|
||||
const LoginPage = require('../../pageobjects/LoginPage')
|
||||
const OverviewPage = require('../../pageobjects/OverviewPage')
|
||||
const ConnectionsPage = require('../../pageobjects/ConnectionsPage');
|
||||
|
||||
|
||||
describe('List MQTT connections', function () {
|
||||
let login
|
||||
let overview
|
||||
let captureScreen
|
||||
let mqttClient
|
||||
|
||||
before(async function () {
|
||||
driver = buildDriver()
|
||||
await goToHome(driver)
|
||||
login = new LoginPage(driver)
|
||||
overview = new OverviewPage(driver)
|
||||
connectionsPage = new ConnectionsPage(driver)
|
||||
captureScreen = captureScreensFor(driver, __filename)
|
||||
|
||||
await login.login('management', 'guest')
|
||||
if (!await overview.isLoaded()) {
|
||||
throw new Error('Failed to login')
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
it('mqtt 5.0 connection', async function () {
|
||||
mqttClient = openConnection(getConnectionOptions())
|
||||
|
||||
let connected = new Promise((resolve, reject) => {
|
||||
mqttClient.on('error', function(err) {
|
||||
reject(err)
|
||||
assert.fail("Mqtt connection failed due to " + err)
|
||||
}),
|
||||
mqttClient.on('connect', function(err2) {
|
||||
resolve("ok")
|
||||
})
|
||||
})
|
||||
assert.equal("ok", await connected)
|
||||
|
||||
try {
|
||||
await overview.clickOnConnectionsTab()
|
||||
|
||||
let table = await doUntil(async function() {
|
||||
return connectionsPage.getConnectionsTable()
|
||||
}, function(table) {
|
||||
return table.length > 0
|
||||
}, 6000)
|
||||
assert.equal(table[0][5], "MQTT 5-0")
|
||||
|
||||
} finally {
|
||||
if (mqttClient) mqttClient.end()
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
after(async function () {
|
||||
await teardown(driver, this, captureScreen)
|
||||
|
||||
})
|
||||
})
|
|
@ -76,9 +76,12 @@ describe('Exchange management', function () {
|
|||
return table.length > 0
|
||||
})
|
||||
|
||||
log("Opening selectable columns popup...")
|
||||
await exchanges.clickOnSelectTableColumns()
|
||||
log("Getting all selectable dolumns ...")
|
||||
let table = await exchanges.getSelectableTableColumns()
|
||||
|
||||
log("Asserting selectable dolumns ...")
|
||||
let overviewGroup = {
|
||||
"name" : "Overview:",
|
||||
"columns": [
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
const mqtt = require('mqtt')
|
||||
|
||||
module.exports = {
|
||||
|
||||
openConnection: (mqttOptions) => {
|
||||
let rabbit = process.env.RABBITMQ_HOSTNAME || 'localhost'
|
||||
let mqttUrl = process.env.RABBITMQ_MQTT_URL || "mqtt://" + rabbit + ":1883"
|
||||
return mqtt.connect(mqttUrl, mqttOptions)
|
||||
},
|
||||
getConnectionOptions: () => {
|
||||
let mqttProtocol = process.env.MQTT_PROTOCOL || 'mqtt'
|
||||
let usemtls = process.env.MQTT_USE_MTLS || false
|
||||
let username = process.env.RABBITMQ_AMQP_USERNAME || 'management'
|
||||
let password = process.env.RABBITMQ_AMQP_PASSWORD || 'guest'
|
||||
let client_id = process.env.RABBITMQ_AMQP_USERNAME || 'selenium-client'
|
||||
|
||||
mqttOptions = {
|
||||
clientId: client_id,
|
||||
protocolId: 'MQTT',
|
||||
protocol: mqttProtocol,
|
||||
protocolVersion: 5,
|
||||
keepalive: 10000,
|
||||
clean: false,
|
||||
reconnectPeriod: '1000',
|
||||
properties: {
|
||||
sessionExpiryInterval: 0
|
||||
}
|
||||
}
|
||||
|
||||
if (mqttProtocol == 'mqtts') {
|
||||
mqttOptions["ca"] = [fs.readFileSync(process.env.RABBITMQ_CERTS + "/ca_rabbitmq_certificate.pem")]
|
||||
}
|
||||
if (usemtls) {
|
||||
mqttOptions["cert"] = fs.readFileSync(process.env.RABBITMQ_CERTS + "/client_rabbitmq_certificate.pem")
|
||||
mqttOptions["key"] = fs.readFileSync(process.env.RABBITMQ_CERTS + "/client_rabbitmq_key.pem")
|
||||
} else {
|
||||
mqttOptions["username"] = username
|
||||
mqttOptions["password"] = password
|
||||
}
|
||||
return mqttOptions
|
||||
}
|
||||
}
|
|
@ -0,0 +1,148 @@
|
|||
const { By, Key, until, Builder } = require('selenium-webdriver')
|
||||
require('chromedriver')
|
||||
const assert = require('assert')
|
||||
const { buildDriver, goToHome, goToQueue, captureScreensFor, teardown, doUntil, findTableRow } = require('../utils')
|
||||
const { createQueue, deleteQueue, getManagementUrl, basicAuthorization } = require('../mgt-api')
|
||||
const mqtt = require('mqtt')
|
||||
|
||||
const LoginPage = require('../pageobjects/LoginPage')
|
||||
const OverviewPage = require('../pageobjects/OverviewPage')
|
||||
const QueuesAndStreamsPage = require('../pageobjects/QueuesAndStreamsPage')
|
||||
const QueuePage = require('../pageobjects/QueuePage')
|
||||
const ConnectionsPage = require('../pageobjects/ConnectionsPage');
|
||||
|
||||
|
||||
describe('Given a mqtt 5.0 connection with a qos 0 subscription with zero sessionExpiryInterval', function () {
|
||||
let login
|
||||
let queuesAndStreams
|
||||
let queuePage
|
||||
let overview
|
||||
let captureScreen
|
||||
let queueName
|
||||
let mqttOptions
|
||||
|
||||
let mqttProtocol = process.env.MQTT_PROTOCOL || 'mqtt'
|
||||
let usemtls = process.env.MQTT_USE_MTLS || false
|
||||
let rabbit = process.env.RABBITMQ_HOSTNAME || 'localhost'
|
||||
let mqttUrl = process.env.RABBITMQ_MQTT_URL || "mqtt://" + rabbit + ":1883"
|
||||
let username = process.env.RABBITMQ_AMQP_USERNAME || 'management'
|
||||
let password = process.env.RABBITMQ_AMQP_PASSWORD || 'guest'
|
||||
let client_id = process.env.RABBITMQ_AMQP_USERNAME || 'selenium-client'
|
||||
let mqttClient
|
||||
|
||||
before(async function () {
|
||||
driver = buildDriver()
|
||||
await goToHome(driver)
|
||||
login = new LoginPage(driver)
|
||||
overview = new OverviewPage(driver)
|
||||
queuePage = new QueuePage(driver)
|
||||
connectionsPage = new ConnectionsPage(driver)
|
||||
queuesAndStreamsPage = new QueuesAndStreamsPage(driver)
|
||||
captureScreen = captureScreensFor(driver, __filename)
|
||||
|
||||
await login.login('management', 'guest')
|
||||
if (!await overview.isLoaded()) {
|
||||
throw new Error('Failed to login')
|
||||
}
|
||||
//await overview.selectRefreshOption("Do not refresh")
|
||||
|
||||
queueName = "test_" + Math.floor(Math.random() * 1000)
|
||||
createQueue(getManagementUrl(), basicAuthorization("management", "guest"),
|
||||
"/", queueName, {
|
||||
"x-queue-type": "quorum"
|
||||
})
|
||||
|
||||
mqttOptions = {
|
||||
clientId: client_id,
|
||||
protocolId: 'MQTT',
|
||||
protocol: mqttProtocol,
|
||||
protocolVersion: 5,
|
||||
keepalive: 10000,
|
||||
clean: false,
|
||||
reconnectPeriod: '1000',
|
||||
properties: {
|
||||
sessionExpiryInterval: 0
|
||||
}
|
||||
}
|
||||
if (mqttProtocol == 'mqtts') {
|
||||
mqttOptions["ca"] = [fs.readFileSync(process.env.RABBITMQ_CERTS + "/ca_rabbitmq_certificate.pem")]
|
||||
}
|
||||
if (usemtls) {
|
||||
mqttOptions["cert"] = fs.readFileSync(process.env.RABBITMQ_CERTS + "/client_rabbitmq_certificate.pem")
|
||||
mqttOptions["key"] = fs.readFileSync(process.env.RABBITMQ_CERTS + "/client_rabbitmq_key.pem")
|
||||
} else {
|
||||
mqttOptions["username"] = username
|
||||
mqttOptions["password"] = password
|
||||
}
|
||||
|
||||
mqttClient = mqtt.connect(mqttUrl, mqttOptions)
|
||||
let subscribed = new Promise((resolve, reject) => {
|
||||
mqttClient.on('error', function(err) {
|
||||
reject(err)
|
||||
assert.fail("Mqtt connection failed due to " + err)
|
||||
}),
|
||||
mqttClient.on('connect', function(err) {
|
||||
mqttClient.subscribe(queueName, {qos:0}, function (err2) {
|
||||
if (!err2) {
|
||||
resolve("ok")
|
||||
}else {
|
||||
reject(err2)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
assert.equal("ok", await subscribed)
|
||||
|
||||
})
|
||||
|
||||
it('should be an mqtt connection listed', async function () {
|
||||
await overview.clickOnConnectionsTab()
|
||||
|
||||
let table = await doUntil(async function() {
|
||||
return connectionsPage.getConnectionsTable()
|
||||
}, function(table) {
|
||||
return table.length > 0
|
||||
}, 6000)
|
||||
assert.equal(table[0][5], "MQTT 5-0")
|
||||
|
||||
})
|
||||
|
||||
it('should be an mqtt qos0 queue listed', async function () {
|
||||
await overview.clickOnQueuesTab()
|
||||
|
||||
await doUntil(function() {
|
||||
return queuesAndStreamsPage.getQueuesTable()
|
||||
}, function(table) {
|
||||
return findTableRow(table, function(row) {
|
||||
return row[2] === 'rabbit_mqtt_qos0_queue'
|
||||
})
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
it('can view mqtt qos0 queue', async function () {
|
||||
await overview.clickOnQueuesTab()
|
||||
|
||||
let table = await doUntil(function() {
|
||||
return queuesAndStreamsPage.getQueuesTable()
|
||||
}, function(t) {
|
||||
return findTableRow(t, function(row) {
|
||||
return row[2] === 'rabbit_mqtt_qos0_queue'
|
||||
})
|
||||
})
|
||||
let mqttQueueName = findTableRow(table, function(row) {
|
||||
return row[2] === 'rabbit_mqtt_qos0_queue'
|
||||
})[1]
|
||||
|
||||
await goToQueue(driver, "/", mqttQueueName)
|
||||
await queuePage.isLoaded()
|
||||
|
||||
})
|
||||
|
||||
after(async function () {
|
||||
await teardown(driver, this, captureScreen)
|
||||
if (mqttClient) mqttClient.end()
|
||||
deleteQueue(getManagementUrl(), basicAuthorization("management", "guest"),
|
||||
"/", queueName)
|
||||
})
|
||||
})
|
|
@ -83,7 +83,7 @@ describe('Given a quorum queue configured with SAC', function () {
|
|||
ch1Consumer = ch1.consume(queueName, (msg) => {}, {consumerTag: "one"})
|
||||
})
|
||||
|
||||
it('it should have one consumer as active', async function() {
|
||||
it('it should have one consumer listed as active', async function() {
|
||||
await doUntil(async function() {
|
||||
await queuePage.refresh()
|
||||
await queuePage.isLoaded()
|
||||
|
@ -111,7 +111,7 @@ describe('Given a quorum queue configured with SAC', function () {
|
|||
ch2Consumer = ch2.consume(queueName, (msg) => {}, {consumerTag: "two", priority: 10})
|
||||
})
|
||||
|
||||
it('the latter consumer should be active and the former waiting', async function() {
|
||||
it('the latter consumer should be listed as active and the former waiting', async function() {
|
||||
|
||||
await doUntil(async function() {
|
||||
await queuePage.refresh()
|
||||
|
@ -177,7 +177,7 @@ describe('Given a quorum queue configured with SAC', function () {
|
|||
ch1Consumer = ch1.consume(queueName, (msg) => {}, {consumerTag: "one", priority: 10})
|
||||
})
|
||||
|
||||
it('it should have one consumer as active', async function() {
|
||||
it('it should have one consumer listed as active', async function() {
|
||||
await doUntil(async function() {
|
||||
await queuePage.refresh()
|
||||
await queuePage.isLoaded()
|
||||
|
|
|
@ -126,10 +126,18 @@ module.exports = {
|
|||
return d.driver.get(d.baseUrl + '#/login?access_token=' + token)
|
||||
},
|
||||
|
||||
goToConnections: (d) => {
|
||||
return d.driver.get(d.baseUrl + '#/connections')
|
||||
},
|
||||
|
||||
goToExchanges: (d) => {
|
||||
return d.driver.get(d.baseUrl + '#/exchanges')
|
||||
},
|
||||
|
||||
goToQueues: (d) => {
|
||||
return d.driver.get(d.baseUrl + '#/queues')
|
||||
},
|
||||
|
||||
goToQueue(d, vhost, queue) {
|
||||
return d.driver.get(d.baseUrl + '#/queues/' + encodeURIComponent(vhost) + '/' + encodeURIComponent(queue))
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue