STOMP: further stomp.py suite modernization steps

This commit is contained in:
Michael Klishin 2021-03-12 07:25:08 +03:00
parent 17cb24deb4
commit a5cd08394e
No known key found for this signature in database
GPG Key ID: E80EDCFA0CDB21EE
18 changed files with 390 additions and 294 deletions

View File

@ -11,16 +11,14 @@
all() ->
[
common,
ssl,
connect_options
common,
ssl,
connect_options
].
init_per_testcase(TestCase, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, TestCase, "-"),
init_per_suite(Config) ->
Config1 = rabbit_ct_helpers:set_config(Config,
[{rmq_certspwd, "bunnychow"},
{rmq_nodename_suffix, Suffix}]),
[{rmq_certspwd, "bunnychow"}]),
rabbit_ct_helpers:log_environment(),
Config2 = rabbit_ct_helpers:run_setup_steps(
Config1,
@ -32,10 +30,16 @@ init_per_testcase(TestCase, Config) ->
rabbit_ct_helpers:make(Config2, StomppyDir, []),
Config2.
end_per_testcase(_, Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Test, Config) ->
rabbit_ct_helpers:testcase_started(Config, Test).
end_per_testcase(Test, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Test).
common(Config) ->
run(Config, filename:join("src", "test.py")).

View File

@ -24,7 +24,7 @@ class TestAck(base.BaseTest):
self.conn.send(destination, "test1")
self.conn.send(destination, "test2")
self.assertTrue(self.listener.wait(4), "initial message not received")
self.assertEquals(2, len(self.listener.messages))
self.assertEqual(2, len(self.listener.messages))
# disconnect with no ack
self.conn.disconnect()
@ -39,7 +39,7 @@ class TestAck(base.BaseTest):
ack='client',
headers={'prefetch-count': '10'})
self.assertTrue(listener2.wait(), "message not received again")
self.assertEquals(2, len(listener2.messages))
self.assertEqual(2, len(listener2.messages))
# now ack only the last message - expecting cumulative behaviour
mid = listener2.messages[1]['headers'][self.ack_id_source_header]
@ -69,7 +69,7 @@ class TestAck(base.BaseTest):
self.conn.send(destination, "test1")
self.conn.send(destination, "test2")
self.assertTrue(self.listener.wait(4), "Both initial messages not received")
self.assertEquals(2, len(self.listener.messages))
self.assertEqual(2, len(self.listener.messages))
# disconnect without acks
self.conn.disconnect()
@ -84,7 +84,7 @@ class TestAck(base.BaseTest):
ack='client-individual',
headers={'prefetch-count': '10'})
self.assertTrue(listener2.wait(2.5), "Did not receive 2 messages")
self.assertEquals(2, len(listener2.messages), "Not exactly 2 messages received")
self.assertEqual(2, len(listener2.messages), "Not exactly 2 messages received")
# now ack only the 'test2' message - expecting individual behaviour
nummsgs = len(listener2.messages)
@ -92,7 +92,7 @@ class TestAck(base.BaseTest):
for ind in range(nummsgs):
if listener2.messages[ind]['message']=="test2":
mid = listener2.messages[ind]['headers'][self.ack_id_source_header]
self.assertEquals(1, ind, 'Expecting test2 to be second message')
self.assertEqual(1, ind, 'Expecting test2 to be second message')
break
self.assertTrue(mid, "Did not find test2 message id.")
self.ack_message(conn2, mid, None)
@ -108,8 +108,8 @@ class TestAck(base.BaseTest):
self.subscribe_dest(conn3, destination, None)
self.assertFalse(listener3.wait(2.5),
"Expected to see only one message. ACK not working?")
self.assertEquals(1, len(listener3.messages), "Expecting exactly one message")
self.assertEquals("test1", listener3.messages[0]['message'], "Unexpected message remains")
self.assertEqual(1, len(listener3.messages), "Expecting exactly one message")
self.assertEqual("test1", listener3.messages[0]['message'], "Unexpected message remains")
finally:
conn3.disconnect()
@ -121,7 +121,7 @@ class TestAck(base.BaseTest):
self.subscribe_dest(self.conn, destination, None, ack='client')
self.conn.send(destination, "test")
self.assertTrue(self.listener.wait(3), "initial message not received")
self.assertEquals(1, len(self.listener.messages))
self.assertEqual(1, len(self.listener.messages))
# disconnect with no ack
self.conn.disconnect()
@ -135,7 +135,7 @@ class TestAck(base.BaseTest):
conn2.begin(transaction=tx)
self.subscribe_dest(conn2, destination, None, ack='client')
self.assertTrue(listener2.wait(), "message not received again")
self.assertEquals(1, len(listener2.messages))
self.assertEqual(1, len(listener2.messages))
# now ack
mid = listener2.messages[0]['headers'][self.ack_id_source_header]
@ -171,7 +171,7 @@ class TestAck(base.BaseTest):
self.assertFalse(self.listener.wait(3),
"Should not have been able to see 6 messages")
self.assertEquals(5, len(self.listener.messages))
self.assertEqual(5, len(self.listener.messages))
def test_nack(self):
destination = "/queue/nack-test"
@ -236,7 +236,7 @@ class TestAck11(TestAck):
return conn
def test_version(self):
self.assertEquals('1.1', self.conn.version)
self.assertEqual('1.1', self.conn.version)
class TestAck12(TestAck):
@ -249,4 +249,13 @@ class TestAck12(TestAck):
return conn
def test_version(self):
self.assertEquals('1.2', self.conn.version)
self.assertEqual('1.2', self.conn.version)
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -31,12 +31,20 @@ class TestAmqpHeaders(base.BaseTest):
# check if we receive the message from the STOMP subscription
self.assertTrue(self.listener.wait(2), "initial message not received")
self.assertEquals(1, len(self.listener.messages))
self.assertEqual(1, len(self.listener.messages))
msg = self.listener.messages[0]
self.assertEquals('Hello World!', msg['message'])
self.assertEquals('value1', msg['headers']['x-custom-hdr-1'])
self.assertEquals('value2', msg['headers']['x-custom-hdr-2'])
self.assertEquals('value3', msg['headers']['custom-hdr-3'])
self.assertEqual('Hello World!', msg['message'])
self.assertEqual('value1', msg['headers']['x-custom-hdr-1'])
self.assertEqual('value2', msg['headers']['x-custom-hdr-2'])
self.assertEqual('value3', msg['headers']['custom-hdr-3'])
self.conn.disconnect()
amqp_conn.close()
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -1,8 +1,8 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
# Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
##
import unittest
@ -10,113 +10,126 @@ import stomp
import sys
import threading
import os
import time
import random
class BaseTest(unittest.TestCase):
def await_condition(self, condition, timeout=5):
cond = threading.Condition()
try:
cond.acquire()
cond.wait_for(lambda: condition(), timeout)
finally:
cond.release()
def create_connection_obj(self, version='1.0', vhost='/', heartbeats=(0, 0)):
if version == '1.0':
conn = stomp.StompConnection10(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))])
self.ack_id_source_header = 'message-id'
self.ack_id_header = 'message-id'
elif version == '1.1':
conn = stomp.StompConnection11(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))],
vhost=vhost,
heartbeats=heartbeats)
self.ack_id_source_header = 'message-id'
self.ack_id_header = 'message-id'
elif version == '1.2':
conn = stomp.StompConnection12(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))],
vhost=vhost,
heartbeats=heartbeats)
self.ack_id_source_header = 'ack'
self.ack_id_header = 'id'
else:
conn = stomp.StompConnection12(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))],
vhost=vhost,
heartbeats=heartbeats)
conn.version = version
return conn
def create_connection_obj(self, version='1.0', vhost='/', heartbeats=(0, 0)):
if version == '1.0':
conn = stomp.StompConnection10(
host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))])
self.ack_id_source_header = 'message-id'
self.ack_id_header = 'message-id'
elif version == '1.1':
conn = stomp.StompConnection11(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))],
vhost=vhost,
heartbeats=heartbeats)
self.ack_id_source_header = 'message-id'
self.ack_id_header = 'message-id'
elif version == '1.2':
conn = stomp.StompConnection12(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))],
vhost=vhost,
heartbeats=heartbeats)
self.ack_id_source_header = 'ack'
self.ack_id_header = 'id'
else:
conn = stomp.StompConnection12(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))],
vhost=vhost,
heartbeats=heartbeats)
conn.version = version
return conn
def create_connection(self, user='guest', passcode='guest', wait=True, **kwargs):
conn = self.create_connection_obj(**kwargs)
conn.connect(user, passcode, wait=wait)
return conn
def create_connection(self, user='guest', passcode='guest', wait=True, **kwargs):
conn = self.create_connection_obj(**kwargs)
conn.connect(user, passcode, wait=wait)
return conn
def subscribe_dest(self, conn, destination, sub_id, **kwargs):
if type(conn) is stomp.StompConnection10:
# 'id' is optional in STOMP 1.0.
if sub_id != None:
kwargs['id'] = sub_id
conn.subscribe(destination, **kwargs)
else:
# 'id' is required in STOMP 1.1+.
if sub_id == None:
sub_id = 'ctag'
conn.subscribe(destination, sub_id, **kwargs)
def subscribe_dest(self, conn, destination, sub_id, **kwargs):
if type(conn) is stomp.StompConnection10:
# 'id' is optional in STOMP 1.0.
if sub_id != None:
kwargs['id'] = sub_id
conn.subscribe(destination, **kwargs)
else:
# 'id' is required in STOMP 1.1+.
if sub_id == None:
sub_id = 'ctag'
conn.subscribe(destination, sub_id, **kwargs)
def unsubscribe_dest(self, conn, destination, sub_id, **kwargs):
if type(conn) is stomp.StompConnection10:
# 'id' is optional in STOMP 1.0.
if sub_id != None:
conn.unsubscribe(id=sub_id, **kwargs)
else:
conn.unsubscribe(destination=destination, **kwargs)
else:
# 'id' is required in STOMP 1.1+.
if sub_id == None:
sub_id = 'ctag'
conn.unsubscribe(sub_id, **kwargs)
def unsubscribe_dest(self, conn, destination, sub_id, **kwargs):
if type(conn) is stomp.StompConnection10:
# 'id' is optional in STOMP 1.0.
if sub_id != None:
conn.unsubscribe(id=sub_id, **kwargs)
else:
conn.unsubscribe(destination=destination, **kwargs)
else:
# 'id' is required in STOMP 1.1+.
if sub_id == None:
sub_id = 'stomp-sub-id {}'.format(random.randint(0, 1000))
conn.unsubscribe(sub_id, **kwargs)
def ack_message(self, conn, msg_id, sub_id, **kwargs):
if type(conn) is stomp.StompConnection10:
conn.ack(msg_id, **kwargs)
elif type(conn) is stomp.StompConnection11:
if sub_id == None:
sub_id = 'ctag'
conn.ack(msg_id, sub_id, **kwargs)
elif type(conn) is stomp.StompConnection12:
conn.ack(msg_id, **kwargs)
def ack_message(self, conn, msg_id, sub_id, **kwargs):
if type(conn) is stomp.StompConnection10:
conn.ack(msg_id, **kwargs)
elif type(conn) is stomp.StompConnection11:
if sub_id == None:
sub_id = 'stomp-sub-id {}'.format(random.randint(0, 1000))
conn.ack(msg_id, sub_id, **kwargs)
elif type(conn) is stomp.StompConnection12:
conn.ack(msg_id, **kwargs)
def nack_message(self, conn, msg_id, sub_id, **kwargs):
if type(conn) is stomp.StompConnection10:
# Normally unsupported by STOMP 1.0.
conn.send_frame("NACK", {"message-id": msg_id})
elif type(conn) is stomp.StompConnection11:
if sub_id == None:
sub_id = 'ctag'
conn.nack(msg_id, sub_id, **kwargs)
elif type(conn) is stomp.StompConnection12:
conn.nack(msg_id, **kwargs)
def nack_message(self, conn, msg_id, sub_id, **kwargs):
if type(conn) is stomp.StompConnection10:
# Normally unsupported by STOMP 1.0.
conn.send_frame("NACK", {"message-id": msg_id})
elif type(conn) is stomp.StompConnection11:
if sub_id == None:
sub_id = 'stomp-sub-id {}'.format(random.randint(0, 1000))
conn.nack(msg_id, sub_id, **kwargs)
elif type(conn) is stomp.StompConnection12:
conn.nack(msg_id, **kwargs)
def create_subscriber_connection(self, dest):
conn = self.create_connection()
listener = WaitableListener()
conn.set_listener('', listener)
self.subscribe_dest(conn, dest, None, receipt="sub.receipt")
listener.wait()
self.assertEquals(1, len(listener.receipts))
listener.reset()
return conn, listener
def create_subscriber_connection(self, dest):
conn = self.create_connection()
listener = WaitableListener()
conn.set_listener('', listener)
self.subscribe_dest(conn, dest, None, receipt="sub.receipt")
listener.wait()
self.assertEqual(1, len(listener.receipts))
listener.reset()
return conn, listener
def setUp(self):
# Note: useful for debugging
# import stomp.listener
def setUp(self):
# Note: useful for debugging
# import stomp.listener
self.conn = self.create_connection()
self.listener = WaitableListener()
self.conn.set_listener('waitable', self.listener)
# Note: useful for debugging
# self.printing_listener = stomp.listener.PrintingListener()
# self.conn.set_listener('printing', self.printing_listener)
self._started_at = time.time()
def tearDown(self):
def tearDown(self):
if self.conn.is_connected():
try:
self.conn.disconnect()
except:
pass
elapsed = time.time() - self._started_at
print('{} ({}s)'.format(self.id(), round(elapsed, 2)))
def simple_test_send_rec(self, dest, headers={}):
def simple_test_send_rec(self, dest, headers={}):
self.listener.reset()
self.subscribe_dest(self.conn, dest, None)
@ -130,11 +143,11 @@ class BaseTest(unittest.TestCase):
# check header content
msg = self.listener.messages[0]
self.assertEquals("foo", msg['message'])
self.assertEquals(dest, msg['headers']['destination'])
self.assertEqual("foo", msg['message'])
self.assertEqual(dest, msg['headers']['destination'])
return msg['headers']
def assertListener(self, errMsg, numMsgs=0, numErrs=0, numRcts=0, timeout=10):
def assertListener(self, errMsg, numMsgs=0, numErrs=0, numRcts=0, timeout=10):
if numMsgs + numErrs + numRcts > 0:
self._assertTrue(self.listener.wait(timeout), errMsg + " (#awaiting)")
else:
@ -143,26 +156,28 @@ class BaseTest(unittest.TestCase):
self._assertEquals(numErrs, len(self.listener.errors), errMsg + " (#errors)")
self._assertEquals(numRcts, len(self.listener.receipts), errMsg + " (#receipts)")
def _assertTrue(self, bool, msg):
if not bool:
self.listener.print_state(msg, True)
self.assertTrue(bool, msg)
def _assertTrue(self, bool, msg):
if not bool:
self.listener.print_state(msg, True)
self.assertTrue(bool, msg)
def _assertFalse(self, bool, msg):
if bool:
self.listener.print_state(msg, True)
self.assertFalse(bool, msg)
def _assertFalse(self, bool, msg):
if bool:
self.listener.print_state(msg, True)
self.assertFalse(bool, msg)
def _assertEquals(self, expected, actual, msg):
if expected != actual:
self.listener.print_state(msg, True)
self.assertEquals(expected, actual, msg)
def _assertEquals(self, expected, actual, msg):
if expected != actual:
self.listener.print_state(msg, True)
self.assertEqual(expected, actual, msg)
def assertListenerAfter(self, verb, errMsg="", numMsgs=0, numErrs=0, numRcts=0, timeout=5):
def assertListenerAfter(self, verb, errMsg="", numMsgs=0, numErrs=0, numRcts=0, timeout=5):
num = numMsgs + numErrs + numRcts
self.listener.reset(num if num>0 else 1)
self.listener.reset(num if num > 0 else 1)
verb()
self.assertListener(errMsg=errMsg, numMsgs=numMsgs, numErrs=numErrs, numRcts=numRcts, timeout=timeout)
self.assertListener(errMsg=errMsg, numMsgs=numMsgs,
numErrs=numErrs, numRcts=numRcts, timeout=timeout)
class WaitableListener(object):
@ -182,7 +197,7 @@ class WaitableListener(object):
def _append(self, array, msg, hdrs):
mno = self._next_msg_no()
array.append({'message' : msg, 'headers' : hdrs, 'msg_no' : mno})
array.append({'message': msg, 'headers' : hdrs, 'msg_no' : mno})
self.latch.countdown()
def on_receipt(self, headers, message):
@ -211,9 +226,15 @@ class WaitableListener(object):
if self.debug:
self.print_state('(reset listener--new state)')
def wait(self, timeout=10):
def wait(self, timeout=4):
return self.latch.wait(timeout)
def wait_for(self, condition, timeout=5):
return self.latch.wait_for(condition, timeout)
def wait_for_complete_countdown(self, timeout=5):
return self.latch.wait_for_complete_countdown(timeout)
def print_state(self, hdr="", full=False):
print(hdr)
print('#messages: {}'.format(len(self.messages)))
@ -221,40 +242,54 @@ class WaitableListener(object):
print('#receipts: {}'.format(len(self.receipts)))
print('Remaining count: {}'.format(self.latch.get_count()))
if full:
if len(self.messages) != 0: print('Messages: {}'.format(self.messages))
if len(self.messages) != 0:
print('Messages: {}'.format(self.messages))
if len(self.errors) != 0: print('Messages: {}'.format(self.errors))
if len(self.receipts) != 0: print('Messages: {}'.format(self.receipts))
if len(self.receipts) != 0:
print('Messages: {}'.format(self.receipts))
class Latch(object):
def __init__(self, count=1):
self.cond = threading.Condition()
self.cond.acquire()
self.count = count
self.cond.release()
def __init__(self, count=1):
self.cond = threading.Condition()
self.cond.acquire()
self.count = count
self.cond.release()
def countdown(self):
self.cond.acquire()
if self.count > 0:
self.count -= 1
if self.count == 0:
self.cond.notify_all()
self.cond.release()
def countdown(self):
self.cond.acquire()
if self.count > 0:
self.count -= 1
if self.count == 0:
self.cond.notify_all()
self.cond.release()
def wait(self, timeout=None):
try:
self.cond.acquire()
if self.count == 0:
return True
else:
self.cond.wait(timeout)
return self.count == 0
finally:
self.cond.release()
def wait(self, timeout=None):
try:
self.cond.acquire()
if self.count == 0:
return True
else:
self.cond.wait(timeout)
return self.count == 0
finally:
self.cond.release()
def get_count(self):
try:
self.cond.acquire()
return self.count
finally:
self.cond.release()
def wait_for_complete_countdown(self, timeout=None):
try:
self.cond.acquire()
if self.count == 0:
return True
else:
self.cond.wait_for(lambda: self.count == 0, timeout)
return True
finally:
self.cond.release()
def get_count(self):
try:
self.cond.acquire()
return self.count
finally:
self.cond.release()

View File

@ -28,9 +28,9 @@ class TestConnectOptions(base.BaseTest):
try:
self.assertTrue(listener.wait(5))
self.assertEquals(1, len(listener.receipts),
self.assertEqual(1, len(listener.receipts),
'Missing receipt. Likely not connected')
self.assertEquals('implicit', listener.receipts[0]['headers']['receipt-id'])
self.assertEqual('implicit', listener.receipts[0]['headers']['receipt-id'])
finally:
new_conn.disconnect()
test_util.disable_implicit_connect()

View File

@ -36,7 +36,7 @@ class TestExchange(base.BaseTest):
ack="auto")
self.assertListener("Expecting an error", numErrs=1)
err = self.listener.errors[0]
self.assertEquals("not_found", err['headers']['message'])
self.assertEqual("not_found", err['headers']['message'])
self.assertRegex(err['message'], r'^NOT_FOUND')
time.sleep(1)
self.assertFalse(self.conn.is_connected())
@ -72,7 +72,7 @@ class TestQueue(base.BaseTest):
conn2.set_listener('', listener2)
self.subscribe_dest(conn2, destination, None, ack="auto")
self.assertTrue(listener2.wait(10), "no receive")
self.assertTrue(listener2.wait_for_complete_countdown(), "no receive")
finally:
conn2.disconnect()
@ -82,7 +82,7 @@ class TestQueue(base.BaseTest):
# send
self.conn.send(destination, "hello thar", receipt="foo")
self.listener.wait(3)
self.listener.wait_for_complete_countdown(3)
self.conn.disconnect()
# now receive
@ -92,7 +92,7 @@ class TestQueue(base.BaseTest):
conn2.set_listener('', listener2)
self.subscribe_dest(conn2, destination, None, ack="auto")
self.assertTrue(listener2.wait(10), "no receive")
self.assertTrue(listener2.wait_for_complete_countdown(), "no receive")
finally:
conn2.disconnect()
@ -112,10 +112,10 @@ class TestQueue(base.BaseTest):
## expect both consumers to get a message?
self.assertTrue(listener1.wait(2))
self.assertEquals(1, len(listener1.messages),
self.assertEqual(1, len(listener1.messages),
"unexpected message count")
self.assertTrue(listener2.wait(2))
self.assertEquals(1, len(listener2.messages),
self.assertEqual(1, len(listener2.messages),
"unexpected message count")
finally:
conn1.disconnect()
@ -153,7 +153,7 @@ class TestQueue(base.BaseTest):
self.assertListener("Missing messages/receipts", numMsgs=3, numRcts=2, timeout=3)
self.assertEquals(set(['a','b']), self.__gather_receipts())
self.assertEqual(set(['a','b']), self.__gather_receipts())
def test_interleaved_receipt_no_receipt_tx(self):
''' Test i-leaved receipt/no receipt, no-r bracketed by r+xactions '''
@ -177,7 +177,7 @@ class TestQueue(base.BaseTest):
expected = set(['a', 'b'])
missing = expected.difference(self.__gather_receipts())
self.assertEquals(set(), missing, "Missing receipts: " + str(missing))
self.assertEqual(set(), missing, "Missing receipts: " + str(missing))
def test_interleaved_receipt_no_receipt_inverse(self):
''' Test i-leaved receipt/no receipt, r bracketed by no-rs '''
@ -193,7 +193,7 @@ class TestQueue(base.BaseTest):
self.assertListener("Missing messages/receipt", numMsgs=3, numRcts=1, timeout=3)
self.assertEquals(set(['a']), self.__gather_receipts())
self.assertEqual(set(['a']), self.__gather_receipts())
def __test_send_receipt(self, destination, before, after, headers = {}):
count = 50
@ -209,12 +209,12 @@ class TestQueue(base.BaseTest):
receipt=receipt, headers=headers)
after()
self.assertTrue(self.listener.wait(5))
self.assertTrue(self.listener.wait_for_complete_countdown())
missing_receipts = expected_receipts.difference(
self.__gather_receipts())
self.assertEquals(set(), missing_receipts,
self.assertEqual(set(), missing_receipts,
"missing receipts: " + str(missing_receipts))
def __gather_receipts(self):
@ -248,11 +248,11 @@ class TestTopic(base.BaseTest):
self.conn.send(destination, "test2")
## expect both consumers to get both messages
self.assertTrue(listener1.wait(5))
self.assertEquals(2, len(listener1.messages),
self.assertTrue(listener1.wait_for_complete_countdown())
self.assertEqual(2, len(listener1.messages),
"unexpected message count")
self.assertTrue(listener2.wait(5))
self.assertEquals(2, len(listener2.messages),
self.assertTrue(listener2.wait_for_complete_countdown())
self.assertEqual(2, len(listener2.messages),
"unexpected message count")
finally:
conn1.disconnect()
@ -276,13 +276,13 @@ class TestTopic(base.BaseTest):
self.conn.send(destination, message)
self.assertTrue(listener1.wait(10))
self.assertEquals(2, len(listener1.messages),
self.assertEqual(2, len(listener1.messages),
"unexpected message count")
self.assertTrue(len(listener2.messages[0]['message']) == s,
"unexpected message size")
self.assertTrue(listener2.wait(10))
self.assertEquals(2, len(listener2.messages),
self.assertEqual(2, len(listener2.messages),
"unexpected message count")
finally:
conn1.disconnect()
@ -292,7 +292,7 @@ class TestReplyQueue(base.BaseTest):
def test_reply_queue(self):
''' Test with two separate clients. Client 1 sends
message to a known destination with a defined reply
a message to a known destination with a defined reply
queue. Client 2 receives on known destination and replies
on the reply destination. Client 1 gets the reply message'''
@ -307,54 +307,19 @@ class TestReplyQueue(base.BaseTest):
self.conn.send(known, "test",
headers = {"reply-to": reply})
self.assertTrue(listener2.wait(5))
self.assertEquals(1, len(listener2.messages))
self.assertTrue(listener2.wait_for_complete_countdown())
self.assertEqual(1, len(listener2.messages))
reply_to = listener2.messages[0]['headers']['reply-to']
self.assertTrue(reply_to.startswith('/reply-queue/'))
conn2.send(reply_to, "reply")
self.assertTrue(self.listener.wait(5))
self.assertEquals("reply", self.listener.messages[0]['message'])
self.assertTrue(self.listener.wait_for_complete_countdown())
self.assertEqual("reply", self.listener.messages[0]['message'])
finally:
conn2.disconnect()
def test_reuse_reply_queue(self):
''' Test re-use of reply-to queue '''
known2 = '/queue/known2'
known3 = '/queue/known3'
reply = '/temp-queue/foo'
def respond(cntn, listna):
self.assertTrue(listna.wait(5))
self.assertEquals(1, len(listna.messages))
reply_to = listna.messages[0]['headers']['reply-to']
self.assertTrue(reply_to.startswith('/reply-queue/'))
cntn.send(reply_to, "reply")
## Client 1 uses pre-supplied connection and listener
## Set up clients 2 and 3
conn2, listener2 = self.create_subscriber_connection(known2)
conn3, listener3 = self.create_subscriber_connection(known3)
try:
self.listener.reset(2)
self.conn.send(known2, "test2",
headers = {"reply-to": reply})
self.conn.send(known3, "test3",
headers = {"reply-to": reply})
respond(conn2, listener2)
respond(conn3, listener3)
self.assertTrue(self.listener.wait(5))
self.assertEquals(2, len(self.listener.messages))
self.assertEquals("reply", self.listener.messages[0]['message'])
self.assertEquals("reply", self.listener.messages[1]['message'])
finally:
conn2.disconnect()
conn3.disconnect()
def test_perm_reply_queue(self):
def test_durable_known_reply_queue(self):
'''As test_reply_queue, but with a non-temp reply queue'''
known = '/queue/known'
@ -369,15 +334,15 @@ class TestReplyQueue(base.BaseTest):
conn1.send(known, "test",
headers = {"reply-to": reply})
self.assertTrue(listener2.wait(5))
self.assertEquals(1, len(listener2.messages))
self.assertTrue(listener2.wait_for_complete_countdown())
self.assertEqual(1, len(listener2.messages))
reply_to = listener2.messages[0]['headers']['reply-to']
self.assertTrue(reply_to == reply)
conn2.send(reply_to, "reply")
self.assertTrue(listener1.wait(5))
self.assertEquals("reply", listener1.messages[0]['message'])
self.assertTrue(listener1.wait_for_complete_countdown())
self.assertEqual("reply", listener1.messages[0]['message'])
finally:
conn1.disconnect()
conn2.disconnect()
@ -401,20 +366,20 @@ class TestDurableSubscription(base.BaseTest):
if not listener:
listener = self.listener
self.assertTrue(listener.wait(5))
self.assertEquals(1, len(self.listener.receipts))
self.assertTrue(listener.wait_for_complete_countdown())
self.assertEqual(1, len(self.listener.receipts))
if pos is not None:
self.assertEquals(pos, self.listener.receipts[0]['msg_no'])
self.assertEqual(pos, self.listener.receipts[0]['msg_no'])
def __assert_message(self, msg, listener=None, pos=None):
if not listener:
listener = self.listener
self.assertTrue(listener.wait(5))
self.assertEquals(1, len(listener.messages))
self.assertEquals(msg, listener.messages[0]['message'])
self.assertTrue(listener.wait_for_complete_countdown())
self.assertEqual(1, len(listener.messages))
self.assertEqual(msg, listener.messages[0]['message'])
if pos is not None:
self.assertEquals(pos, self.listener.messages[0]['msg_no'])
self.assertEqual(pos, self.listener.messages[0]['msg_no'])
def do_test_durable_subscription(self, durability_header):
destination = '/topic/durable'
@ -451,8 +416,8 @@ class TestDurableSubscription(base.BaseTest):
# resubscribe and expect no message
self.__subscribe(destination)
self.assertTrue(self.listener.wait(3))
self.assertEquals(0, len(self.listener.messages))
self.assertEquals(1, len(self.listener.receipts))
self.assertEqual(0, len(self.listener.messages))
self.assertEqual(1, len(self.listener.receipts))
def test_durable_subscription(self):
self.do_test_durable_subscription('durable')
@ -475,12 +440,13 @@ class TestDurableSubscription(base.BaseTest):
self.listener.reset(100)
n = 100
# send 100 messages
for x in range(0, 100):
for x in range(0, n):
self.conn.send(destination, "msg" + str(x))
self.assertTrue(self.listener.wait(5))
self.assertEquals(100, len(self.listener.messages))
self.assertTrue(self.listener.wait_for_complete_countdown())
self.assertEqual(n, len(self.listener.messages))
finally:
conn2.disconnect()
@ -500,20 +466,20 @@ class TestDurableSubscription(base.BaseTest):
self.unsubscribe_dest(self.conn, destination, TestDurableSubscription.ID)
self.unsubscribe_dest(conn2, destination, "other.id")
self.listener.reset(101)
listener2.reset(101) ## 100 messages and 1 receipt
self.listener.reset(11)
listener2.reset(11) ## 10 messages and 1 receipt
# send 100 messages
for x in range(0, 100):
for x in range(0, 10):
self.conn.send(destination, "msg" + str(x))
self.__subscribe(destination)
self.__subscribe(destination, conn2, "other.id")
for l in [self.listener, listener2]:
self.assertTrue(l.wait(20))
self.assertTrue(len(l.messages) >= 90)
self.assertTrue(len(l.messages) <= 100)
self.assertTrue(l.wait_for_complete_countdown())
self.assertTrue(len(l.messages) >= 9)
self.assertTrue(len(l.messages) <= 10)
finally:
conn2.disconnect()
@ -524,11 +490,19 @@ class TestDurableSubscription(base.BaseTest):
self.conn.send_frame('SUBSCRIBE',
{'destination': destination, 'ack': 'auto', header: 'true'})
self.listener.wait(3)
self.assertEquals(1, len(self.listener.errors))
self.assertEquals("Missing Header", self.listener.errors[0]['headers']['message'])
self.assertEqual(1, len(self.listener.errors))
self.assertEqual("Missing Header", self.listener.errors[0]['headers']['message'])
def test_durable_subscribe_no_id(self):
self.do_test_durable_subscribe_no_id_and_header('durable')
def test_durable_subscribe_no_id_and_legacy_header(self):
self.do_test_durable_subscribe_no_id_and_header('persistent')
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -20,10 +20,10 @@ class TestErrorsAndCloseConnection(base.BaseTest):
self.assertTrue(self.listener.wait())
self.assertEquals(1, len(self.listener.errors))
self.assertEqual(1, len(self.listener.errors))
errorReceived = self.listener.errors[0]
self.assertEquals("Duplicated subscription identifier", errorReceived['headers']['message'])
self.assertEquals("A subscription identified by 'T_1' already exists.", errorReceived['message'])
self.assertEqual("Duplicated subscription identifier", errorReceived['headers']['message'])
self.assertEqual("A subscription identified by 'T_1' already exists.", errorReceived['message'])
time.sleep(2)
self.assertFalse(self.conn.is_connected())
@ -66,10 +66,10 @@ class TestErrors(base.BaseTest):
self.conn.send("/something/interesting", 'test_unknown_destination')
self.assertTrue(self.listener.wait())
self.assertEquals(1, len(self.listener.errors))
self.assertEqual(1, len(self.listener.errors))
err = self.listener.errors[0]
self.assertEquals("Unknown destination", err['headers']['message'])
self.assertEqual("Unknown destination", err['headers']['message'])
def test_send_missing_destination(self):
self.__test_missing_destination("SEND")
@ -82,20 +82,28 @@ class TestErrors(base.BaseTest):
self.conn.send_frame(command)
self.assertTrue(self.listener.wait())
self.assertEquals(1, len(self.listener.errors))
self.assertEqual(1, len(self.listener.errors))
err = self.listener.errors[0]
self.assertEquals("Missing destination", err['headers']['message'])
self.assertEqual("Missing destination", err['headers']['message'])
def __test_invalid_destination(self, dtype, content):
self.listener.reset()
self.conn.send("/" + dtype + content, '__test_invalid_destination:' + dtype + content)
self.assertTrue(self.listener.wait())
self.assertEquals(1, len(self.listener.errors))
self.assertEqual(1, len(self.listener.errors))
err = self.listener.errors[0]
self.assertEquals("Invalid destination", err['headers']['message'])
self.assertEquals("'" + content + "' is not a valid " +
self.assertEqual("Invalid destination", err['headers']['message'])
self.assertEqual("'" + content + "' is not a valid " +
dtype + " destination\n",
err['message'])
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -9,6 +9,7 @@ import unittest
import stomp
import base
import time
import threading
class TestLifecycle(base.BaseTest):
@ -24,12 +25,12 @@ class TestLifecycle(base.BaseTest):
def test_unsubscribe_queue_destination(self):
''' Test UNSUBSCRIBE command with queue'''
d = "/queue/unsub01"
d = "/queue/test_unsubscribe_queue_destination"
self.unsub_test(d, self.sub_and_send(d))
def test_unsubscribe_queue_destination_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with queue'''
d = "/queue/unsub02"
d = "/queue/test_unsubscribe_queue_destination_with_receipt"
self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1)
def test_unsubscribe_exchange_id(self):
@ -44,12 +45,12 @@ class TestLifecycle(base.BaseTest):
def test_unsubscribe_queue_id(self):
''' Test UNSUBSCRIBE command with queue by id'''
d = "/queue/unsub03"
d = "/queue/test_unsubscribe_queue_id"
self.unsub_test(d, self.sub_and_send(d, subid="queid"))
def test_unsubscribe_queue_id_with_receipt(self):
''' Test receipted UNSUBSCRIBE command with queue by id'''
d = "/queue/unsub04"
d = "/queue/unsutest_unsubscribe_queue_id_with_receiptb04"
self.unsub_test(d, self.sub_and_send(d, subid="queid", receipt="unsub.rct"), numRcts=1)
def test_connect_version_1_0(self):
@ -78,9 +79,6 @@ class TestLifecycle(base.BaseTest):
self.assertTrue(new_conn.is_connected())
finally:
new_conn.disconnect()
# connections are closed asynchronously, e.g. a few seconds later
time.sleep(6)
self.assertFalse(new_conn.is_connected())
def test_unsupported_version(self):
''' Test unsupported version on CONNECT command'''
@ -106,7 +104,7 @@ class TestLifecycle(base.BaseTest):
try:
new_conn.connect(user, passcode)
self.assertTrue(listener.wait())
self.assertEquals(expected, listener.errors[0]['message'])
self.assertEqual(expected, listener.errors[0]['message'])
finally:
if new_conn.is_connected():
new_conn.disconnect()
@ -116,10 +114,10 @@ class TestLifecycle(base.BaseTest):
self.listener.reset(1)
self.conn.send_frame("SEND", {"destination":"a", "message-id":"1"})
self.assertTrue(self.listener.wait())
self.assertEquals(1, len(self.listener.errors))
self.assertEqual(1, len(self.listener.errors))
errorReceived = self.listener.errors[0]
self.assertEquals("Invalid header", errorReceived['headers']['message'])
self.assertEquals("'message-id' is not allowed on 'SEND'.\n", errorReceived['message'])
self.assertEqual("Invalid header", errorReceived['headers']['message'])
self.assertEqual("'message-id' is not allowed on 'SEND'.\n", errorReceived['message'])
def test_send_recv_header(self):
''' Test sending a custom header and receiving it back '''
@ -129,16 +127,14 @@ class TestLifecycle(base.BaseTest):
'custom-header-3': 'value3'}
self.listener.reset(1)
recv_hdrs = self.simple_test_send_rec(dest, headers=hdrs)
self.assertEquals('value1', recv_hdrs['x-custom-header-1'])
self.assertEquals('value2', recv_hdrs['x-custom-header-2'])
self.assertEquals('value3', recv_hdrs['custom-header-3'])
self.assertEqual('value1', recv_hdrs['x-custom-header-1'])
self.assertEqual('value2', recv_hdrs['x-custom-header-2'])
self.assertEqual('value3', recv_hdrs['custom-header-3'])
def test_disconnect(self):
''' Test DISCONNECT command'''
self.conn.disconnect()
# connections are closed asynchronously, e.g. a few seconds later
time.sleep(7)
self.assertFalse(self.conn.is_connected())
self.await_condition(lambda: not self.conn.is_connected())
def test_disconnect_with_receipt(self):
''' Test the DISCONNECT command with receipts '''
@ -146,9 +142,9 @@ class TestLifecycle(base.BaseTest):
self.listener.reset(1)
self.conn.send_frame("DISCONNECT", {"receipt": "test"})
self.assertTrue(self.listener.wait())
self.assertEquals(1, len(self.listener.receipts))
self.assertEqual(1, len(self.listener.receipts))
receiptReceived = self.listener.receipts[0]['headers']['receipt-id']
self.assertEquals("test", receiptReceived
self.assertEqual("test", receiptReceived
, "Wrong receipt received: '" + receiptReceived + "'")
def unsub_test(self, dest, verbs, numRcts=0):
@ -165,6 +161,7 @@ class TestLifecycle(base.BaseTest):
def sub_and_send(self, dest, subid=None, receipt=None):
def subfun():
self.subscribe_dest(self.conn, dest, subid)
time.sleep(1)
self.conn.send(dest, "test")
def unsubfun():
headers = {}
@ -172,3 +169,10 @@ class TestLifecycle(base.BaseTest):
headers['receipt'] = receipt
self.unsubscribe_dest(self.conn, dest, subid, **headers)
return subfun, unsubfun
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -329,3 +329,11 @@ class TestParsing(unittest.TestCase):
self.assertEqual(bodybuf, bodyresp,
" body ('%s')\nincorrectly returned as ('%s')"
% (bodyresp, bodybuf))
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -85,3 +85,11 @@ class TestQueueProperties(base.BaseTest):
self.conn.disconnect()
connection.close()
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -20,8 +20,8 @@ class TestRedelivered(base.BaseTest):
self.conn.send(destination, "test1")
message_receive_timeout = 30
self.assertTrue(self.listener.wait(message_receive_timeout), "Test message not received within {0} seconds".format(message_receive_timeout))
self.assertEquals(1, len(self.listener.messages))
self.assertEquals('false', self.listener.messages[0]['headers']['redelivered'])
self.assertEqual(1, len(self.listener.messages))
self.assertEqual('false', self.listener.messages[0]['headers']['redelivered'])
# disconnect with no ack
self.conn.disconnect()
@ -34,7 +34,14 @@ class TestRedelivered(base.BaseTest):
conn2.set_listener('', listener2)
self.subscribe_dest(conn2, destination, None, ack='client')
self.assertTrue(listener2.wait(), "message not received again")
self.assertEquals(1, len(listener2.messages))
self.assertEquals('true', listener2.messages[0]['headers']['redelivered'])
self.assertEqual(1, len(listener2.messages))
self.assertEqual('true', listener2.messages[0]['headers']['redelivered'])
finally:
conn2.disconnect()
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -32,7 +32,7 @@ class TestReliability(base.BaseTest):
pub_conn.disconnect()
if listener.wait(30):
self.assertEquals(count, len(listener.messages))
self.assertEqual(count, len(listener.messages))
else:
listener.print_state("Final state of listener:")
self.fail("Did not receive %s messages in time" % count)

View File

@ -66,7 +66,7 @@ class TestSslClient(unittest.TestCase):
self.assertTrue(listener.wait(1))
self.assertEquals("sub",
self.assertEqual("sub",
listener.receipts[0]['headers']['receipt-id'])
listener.reset(1)
@ -74,6 +74,6 @@ class TestSslClient(unittest.TestCase):
self.assertTrue(listener.wait())
self.assertEquals("Hello SSL!", listener.messages[0]['message'])
self.assertEqual("Hello SSL!", listener.messages[0]['message'])
finally:
conn.disconnect()

View File

@ -50,3 +50,11 @@ class TestTopicPermissions(base.BaseTest):
# assert errors
self.assertGreater(len(self.listener.errors), 0)
self.assertIn("ACCESS_REFUSED", self.listener.errors[0]['message'])
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -25,16 +25,16 @@ class TestTransactions(base.BaseTest):
## should see the second message
self.assertTrue(self.listener.wait(3))
self.assertEquals(1, len(self.listener.messages))
self.assertEquals("again!", self.listener.messages[0]['message'])
self.assertEqual(1, len(self.listener.messages))
self.assertEqual("again!", self.listener.messages[0]['message'])
## now look for the first message
self.listener.reset()
self.conn.commit(transaction=tx)
self.assertTrue(self.listener.wait(3))
self.assertEquals(1, len(self.listener.messages),
self.assertEqual(1, len(self.listener.messages),
"Missing committed message")
self.assertEquals("hello!", self.listener.messages[0]['message'])
self.assertEqual("hello!", self.listener.messages[0]['message'])
def test_tx_abort(self):
''' Test TX with an ABORT and ensure messages are discarded '''
@ -49,13 +49,13 @@ class TestTransactions(base.BaseTest):
## should see the second message
self.assertTrue(self.listener.wait(3))
self.assertEquals(1, len(self.listener.messages))
self.assertEquals("again!", self.listener.messages[0]['message'])
self.assertEqual(1, len(self.listener.messages))
self.assertEqual("again!", self.listener.messages[0]['message'])
## now look for the first message to be discarded
self.listener.reset()
self.conn.abort(transaction=tx)
self.assertFalse(self.listener.wait(3))
self.assertEquals(0, len(self.listener.messages),
self.assertEqual(0, len(self.listener.messages),
"Unexpected committed message")

View File

@ -37,7 +37,7 @@ class TestUserGeneratedQueueName(base.BaseTest):
# check if we receive the message from the STOMP subscription
self.assertTrue(self.listener.wait(2), "initial message not received")
self.assertEquals(1, len(self.listener.messages))
self.assertEqual(1, len(self.listener.messages))
self.conn.disconnect()
connection.close()
@ -65,7 +65,15 @@ class TestUserGeneratedQueueName(base.BaseTest):
# check if we receive the message from the STOMP subscription
self.assertTrue(self.listener.wait(2), "initial message not received")
self.assertEquals(1, len(self.listener.messages))
self.assertEqual(1, len(self.listener.messages))
self.conn.disconnect()
connection.close()
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -55,8 +55,15 @@ class TestUserGeneratedQueueName(base.BaseTest):
if quorum_queue_supported:
# check if we receive the message from the STOMP subscription
self.assertTrue(self.listener.wait(5), "initial message not received")
self.assertEquals(1, len(self.listener.messages))
self.assertTrue(self.listener.wait_for_complete_countdown(), "initial message not received")
self.assertEqual(1, len(self.listener.messages))
self.conn.disconnect()
connection.close()
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)

View File

@ -58,7 +58,15 @@ class TestUserGeneratedQueueName(base.BaseTest):
if stream_queue_supported:
# check if we receive the message from the STOMP subscription
self.assertTrue(self.listener.wait(5), "initial message not received")
self.assertEquals(1, len(self.listener.messages))
self.assertEqual(1, len(self.listener.messages))
self.conn.disconnect()
connection.close()
if __name__ == '__main__':
import test_runner
modules = [
__name__
]
test_runner.run_unittests(modules)