STOMP: continue modernizing stomp.py test suites

This commit is contained in:
Michael Klishin 2021-03-12 09:31:39 +03:00
parent bc769343bb
commit 32814fb664
No known key found for this signature in database
GPG Key ID: E80EDCFA0CDB21EE
11 changed files with 190 additions and 113 deletions

View File

@ -11,14 +11,31 @@
all() ->
[
common,
ssl,
connect_options
%% This must use a dedicated node as they mess with plugin configuration in incompatible ways
{group, tls},
{group, implicit_connect},
{group, main}
].
init_per_suite(Config) ->
groups() ->
[
{main, [], [
main
]},
{implicit_connect, [], [
implicit_connect
]},
{tls, [], [
tls_connections
]}
].
init_per_group(_, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config,
[{rmq_certspwd, "bunnychow"}]),
[
{rmq_nodename_suffix, ?MODULE},
{rmq_certspwd, "bunnychow"}
]),
rabbit_ct_helpers:log_environment(),
Config2 = rabbit_ct_helpers:run_setup_steps(
Config1,
@ -30,7 +47,7 @@ init_per_suite(Config) ->
rabbit_ct_helpers:make(Config2, StomppyDir, []),
Config2.
end_per_suite(Config) ->
end_per_group(_, Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
@ -41,14 +58,15 @@ end_per_testcase(Test, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Test).
common(Config) ->
run(Config, filename:join("src", "test.py")).
main(Config) ->
run(Config, filename:join("src", "main_runner.py")).
connect_options(Config) ->
run(Config, filename:join("src", "test_connect_options.py")).
implicit_connect(Config) ->
run(Config, filename:join("src", "implicit_connect_runner.py")).
tls_connections(Config) ->
run(Config, filename:join("src", "tls_runner.py")).
ssl(Config) ->
run(Config, filename:join("src", "test_ssl.py")).
run(Config, Test) ->
DataDir = ?config(data_dir, Config),

View File

@ -258,7 +258,7 @@ class Latch(object):
self.cond.release()
def countdown(self):
self.cond.acquire()
self.cond.acquire(blocking=False)
if self.count > 0:
self.count -= 1
if self.count == 0:
@ -267,7 +267,7 @@ class Latch(object):
def wait(self, timeout=None):
try:
self.cond.acquire()
self.cond.acquire(blocking=False)
if self.count == 0:
return True
else:

View File

@ -9,50 +9,12 @@ import unittest
import stomp
import base
import time
import os
import threading
class TestLifecycle(base.BaseTest):
def test_unsubscribe_exchange_destination(self):
''' Test UNSUBSCRIBE command with exchange'''
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d))
def test_unsubscribe_exchange_destination_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with exchange'''
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1)
def test_unsubscribe_queue_destination(self):
''' Test UNSUBSCRIBE command with queue'''
d = "/queue/test_unsubscribe_queue_destination"
self.unsub_test(d, self.sub_and_send(d))
def test_unsubscribe_queue_destination_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with queue'''
d = "/queue/test_unsubscribe_queue_destination_with_receipt"
self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1)
def test_unsubscribe_exchange_id(self):
''' Test UNSUBSCRIBE command with exchange by id'''
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d, subid="exchid"))
def test_unsubscribe_exchange_id_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with exchange by id'''
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d, subid="exchid", receipt="unsub.rct"), numRcts=1)
def test_unsubscribe_queue_id(self):
''' Test UNSUBSCRIBE command with queue by id'''
d = "/queue/test_unsubscribe_queue_id"
self.unsub_test(d, self.sub_and_send(d, subid="queid"))
def test_unsubscribe_queue_id_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with queue by id'''
d = "/queue/test_unsubscribe_queue_id_with_receipt"
self.unsub_test(d, self.sub_and_send(d, subid="queid", receipt="unsub.rct"), numRcts=1)
import test_util
class TestConnectDisconnect(base.BaseTest):
def test_connect_version_1_0(self):
''' Test CONNECT with version 1.0'''
self.conn.disconnect()
@ -80,6 +42,22 @@ class TestLifecycle(base.BaseTest):
finally:
new_conn.disconnect()
def test_default_user(self):
''' Default user connection '''
self.conn.disconnect()
test_util.enable_default_user()
listener = base.WaitableListener()
new_conn = stomp.Connection(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))])
new_conn.set_listener('', listener)
new_conn.connect()
try:
self.assertFalse(listener.wait(3)) # no error back
self.assertTrue(new_conn.is_connected())
finally:
new_conn.disconnect()
test_util.disable_default_user()
def test_unsupported_version(self):
''' Test unsupported version on CONNECT command'''
self.bad_connect("Supported versions are 1.0,1.1,1.2\n", version='100.1')
@ -134,7 +112,9 @@ class TestLifecycle(base.BaseTest):
def test_disconnect(self):
''' Test DISCONNECT command'''
self.conn.disconnect()
self.await_condition(lambda: not self.conn.is_connected())
# Note: with modern-ish stomp.py versions, connection does not transition
# to the disconnected state immediately, and asserting on it in this test
# without a receipt makes no sense
def test_disconnect_with_receipt(self):
''' Test the DISCONNECT command with receipts '''
@ -147,29 +127,6 @@ class TestLifecycle(base.BaseTest):
self.assertEqual("test", receiptReceived
, "Wrong receipt received: '" + receiptReceived + "'")
def unsub_test(self, dest, verbs, numRcts=0):
def afterfun():
self.conn.send(dest, "after-test")
subverb, unsubverb = verbs
self.assertListenerAfter(subverb, numMsgs=1,
errMsg="FAILED to subscribe and send")
self.assertListenerAfter(unsubverb, numRcts=numRcts,
errMsg="Incorrect responses from UNSUBSCRIBE")
self.assertListenerAfter(afterfun,
errMsg="Still receiving messages")
def sub_and_send(self, dest, subid=None, receipt=None):
def subfun():
self.subscribe_dest(self.conn, dest, subid)
time.sleep(1)
self.conn.send(dest, "test")
def unsubfun():
headers = {}
if receipt != None:
headers['receipt'] = receipt
self.unsubscribe_dest(self.conn, dest, subid, **headers)
return subfun, unsubfun
if __name__ == '__main__':
import test_runner
modules = [

View File

@ -58,6 +58,19 @@ class TestQueue(base.BaseTest):
destination = '/queue/test'
self.simple_test_send_rec(destination)
def test_send_recv_header(self):
''' Test sending a custom header and receiving it back '''
dest = '/queue/custom-header'
hdrs = {'x-custom-header-1': 'value1',
'x-custom-header-2': 'value2',
'custom-header-3': 'value3'}
self.listener.reset(1)
recv_hdrs = self.simple_test_send_rec(dest, headers=hdrs)
self.assertEqual('value1', recv_hdrs['x-custom-header-1'])
self.assertEqual('value2', recv_hdrs['x-custom-header-2'])
self.assertEqual('value3', recv_hdrs['custom-header-3'])
def test_send_receive_in_other_conn(self):
''' Test send in one connection, receive in another '''
destination = '/queue/test2'

View File

@ -8,10 +8,16 @@
import unittest
import stomp
import base
import test_util
import time
import os
import threading
class TestConnectOptions(base.BaseTest):
import test_util
class TestImplicitConnect(base.BaseTest):
"""
Relies on implicit connect being enabled on the node
"""
def test_implicit_connect(self):
''' Implicit connect with receipt on first command '''
@ -35,17 +41,10 @@ class TestConnectOptions(base.BaseTest):
new_conn.disconnect()
test_util.disable_implicit_connect()
def test_default_user(self):
''' Default user connection '''
self.conn.disconnect()
test_util.enable_default_user()
listener = base.WaitableListener()
new_conn = stomp.Connection(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))])
new_conn.set_listener('', listener)
new_conn.connect()
try:
self.assertFalse(listener.wait(3)) # no error back
self.assertTrue(new_conn.is_connected())
finally:
new_conn.disconnect()
test_util.disable_default_user()
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -0,0 +1,9 @@
#!/usr/bin/env python3
import test_runner
if __name__ == '__main__':
modules = [
'implicit_connect'
]
test_runner.run_unittests(modules)

View File

@ -6,7 +6,7 @@ if __name__ == '__main__':
modules = [
'parsing',
'errors',
'lifecycle',
'connect_disconnect',
'ack',
'amqp_headers',
'queue_properties',
@ -16,6 +16,7 @@ if __name__ == '__main__':
'destinations',
'redelivered',
'topic_permissions',
'unsubscribe',
'x_queue_type_quorum',
'x_queue_type_stream'
]

View File

@ -1,15 +0,0 @@
#!/usr/bin/env python3
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
##
import test_runner
if __name__ == '__main__':
modules = ['connect_options']
test_runner.run_unittests(modules)

View File

@ -20,7 +20,7 @@ ssl_key_file = os.path.join(os.getenv('SSL_CERTS_PATH'), 'client', 'key.pem')
ssl_cert_file = os.path.join(os.getenv('SSL_CERTS_PATH'), 'client', 'cert.pem')
ssl_ca_certs = os.path.join(os.getenv('SSL_CERTS_PATH'), 'testca', 'cacert.pem')
class TestSslClient(unittest.TestCase):
class TestTLSConnection(unittest.TestCase):
def __ssl_connect(self):
conn = stomp.Connection(host_and_ports = [ ('localhost', int(os.environ["STOMP_PORT_TLS"])) ],
@ -77,3 +77,10 @@ class TestSslClient(unittest.TestCase):
self.assertEqual("Hello SSL!", listener.messages[0]['message'])
finally:
conn.disconnect()
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -11,7 +11,7 @@ import test_runner
import test_util
if __name__ == '__main__':
modules = ['ssl_lifecycle']
modules = ['tls_connect_disconnect']
test_util.ensure_ssl_auth_user()
test_runner.run_unittests(modules)

View File

@ -0,0 +1,88 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
##
import unittest
import stomp
import base
import time
import threading
class TestLifecycle(base.BaseTest):
def test_unsubscribe_exchange_destination(self):
''' Test UNSUBSCRIBE command with exchange'''
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d))
def test_unsubscribe_exchange_destination_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with exchange'''
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1)
def test_unsubscribe_queue_destination(self):
''' Test UNSUBSCRIBE command with queue'''
d = "/queue/test_unsubscribe_queue_destination"
self.unsub_test(d, self.sub_and_send(d))
def test_unsubscribe_queue_destination_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with queue'''
d = "/queue/test_unsubscribe_queue_destination_with_receipt"
self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1)
def test_unsubscribe_exchange_id(self):
''' Test UNSUBSCRIBE command with exchange by id'''
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d, subid="exchid"))
def test_unsubscribe_exchange_id_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with exchange by id'''
d = "/exchange/amq.fanout"
self.unsub_test(d, self.sub_and_send(d, subid="exchid", receipt="unsub.rct"), numRcts=1)
def test_unsubscribe_queue_id(self):
''' Test UNSUBSCRIBE command with queue by id'''
d = "/queue/test_unsubscribe_queue_id"
self.unsub_test(d, self.sub_and_send(d, subid="queid"))
def test_unsubscribe_queue_id_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with queue by id'''
d = "/queue/test_unsubscribe_queue_id_with_receipt"
self.unsub_test(d, self.sub_and_send(d, subid="queid", receipt="unsub.rct"), numRcts=1)
##
## Helpers
##
def unsub_test(self, dest, verbs, numRcts=0):
def afterfun():
self.conn.send(dest, "after-test")
subverb, unsubverb = verbs
self.assertListenerAfter(subverb, numMsgs=1,
errMsg="FAILED to subscribe and send")
self.assertListenerAfter(unsubverb, numRcts=numRcts,
errMsg="Incorrect responses from UNSUBSCRIBE")
self.assertListenerAfter(afterfun,
errMsg="Still receiving messages")
def sub_and_send(self, dest, subid=None, receipt=None):
def subfun():
self.subscribe_dest(self.conn, dest, subid)
time.sleep(1)
self.conn.send(dest, "test")
def unsubfun():
headers = {}
if receipt != None:
headers['receipt'] = receipt
self.unsubscribe_dest(self.conn, dest, subid, **headers)
return subfun, unsubfun
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)