Added receipted unsubscribe tests and check for send after unsubscribe properly.
This commit is contained in:
parent
6f8e904f9b
commit
55f2e7eb95
|
|
@ -203,7 +203,7 @@ handle_frame(Command, _Frame, State) ->
|
|||
%% Internal helpers for processing frames callbacks
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
cancel_subscription(missing, State) ->
|
||||
cancel_subscription(ConsumerTag, State) when ConsumerTag == missing ->
|
||||
error("Missing destination or id",
|
||||
"UNSUBSCRIBE must include a 'destination' or 'id' header\n",
|
||||
State);
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ class BaseTest(unittest.TestCase):
|
|||
self.assertEquals("foo", msg['message'])
|
||||
self.assertEquals(dest, msg['headers']['destination'])
|
||||
|
||||
def assertListener(self, errMsg, numMsgs=0, numErrs=0, numRcts=0, timeout=3):
|
||||
def assertListener(self, errMsg, numMsgs=0, numErrs=0, numRcts=0, timeout=1):
|
||||
if numMsgs + numErrs + numRcts > 0:
|
||||
self.assertTrue(self.listener.await(timeout), errMsg + " (#awaiting)")
|
||||
else:
|
||||
|
|
@ -57,7 +57,7 @@ class BaseTest(unittest.TestCase):
|
|||
self.assertEquals(numErrs, len(self.listener.errors), errMsg + " (#errors)")
|
||||
self.assertEquals(numRcts, len(self.listener.receipts), errMsg + " (#receipts)")
|
||||
|
||||
def assertListenerAfter(self, verb, errMsg="", numMsgs=0, numErrs=0, numRcts=0, timeout=3):
|
||||
def assertListenerAfter(self, verb, errMsg="", numMsgs=0, numErrs=0, numRcts=0, timeout=1):
|
||||
num = numMsgs + numErrs + numRcts
|
||||
self.listener.reset(num if num>0 else 1)
|
||||
verb()
|
||||
|
|
|
|||
|
|
@ -7,152 +7,74 @@ class TestLifecycle(base.BaseTest):
|
|||
|
||||
def test_unsubscribe_exchange_destination(self):
|
||||
''' Test UNSUBSCRIBE command with exchange'''
|
||||
self.unsub_test(self.sub_and_send("/exchange/amq.fanout"))
|
||||
d = "/exchange/amq.fanout"
|
||||
self.unsub_test(d, self.sub_and_send(d))
|
||||
|
||||
def test_unsubscribe_exchange_destination_with_receipt(self):
|
||||
''' Test receipted UNSUBSCRIBE command with exchange'''
|
||||
d = "/exchange/amq.fanout"
|
||||
|
||||
# subscribe and send message
|
||||
self.listener.reset()
|
||||
self.conn.subscribe(destination=d)
|
||||
self.conn.send("test", destination=d)
|
||||
self.assertTrue(self.listener.await())
|
||||
self.assertEquals(1, len(self.listener.messages))
|
||||
self.assertEquals(0, len(self.listener.errors))
|
||||
|
||||
# unsubscribe and send with RECEIPTs
|
||||
self.listener.reset()
|
||||
self.conn.unsubscribe(destination=d, receipt="unsub.receipt")
|
||||
self.assertTrue(self.listener.await(1),
|
||||
"No RECEIPT received on UNSUBSCRIBE")
|
||||
self.assertEquals(1, len(self.listener.receipts),
|
||||
"Expected a receipt on UNSUBSCRIBE")
|
||||
|
||||
self.listener.reset(2) # RECEIPT and possibly message
|
||||
self.conn.send("test", destination=d, receipt="send.receipt")
|
||||
self.assertFalse(self.listener.await(3),
|
||||
"UNSUBSCRIBE exchange failed, still receiving messages")
|
||||
self.assertEquals(1, len(self.listener.receipts),
|
||||
"Expected a receipt on SEND")
|
||||
self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1)
|
||||
|
||||
def test_unsubscribe_queue_destination(self):
|
||||
''' Test UNSUBSCRIBE command with queue'''
|
||||
self.unsub_test(self.sub_and_send("/queue/unsub01"))
|
||||
d = "/queue/unsub01"
|
||||
self.unsub_test(d, self.sub_and_send(d))
|
||||
|
||||
def test_unsubscribe_queue_destination_with_receipt(self):
|
||||
''' Test receipted UNSUBSCRIBE command with queue'''
|
||||
d = "/queue/unsub03"
|
||||
|
||||
# subscribe and send message
|
||||
self.listener.reset()
|
||||
self.conn.subscribe(destination=d)
|
||||
self.conn.send("test", destination=d)
|
||||
self.assertTrue(self.listener.await())
|
||||
self.assertEquals(1, len(self.listener.messages))
|
||||
self.assertEquals(0, len(self.listener.errors))
|
||||
|
||||
# unsubscribe and send with RECEIPTs
|
||||
self.listener.reset()
|
||||
self.conn.unsubscribe(destination=d, receipt="unsub.receipt")
|
||||
self.assertTrue(self.listener.await(1),
|
||||
"No RECEIPT received on UNSUBSCRIBE")
|
||||
self.assertEquals(1, len(self.listener.receipts),
|
||||
"Expected a receipt on UNSUBSCRIBE")
|
||||
|
||||
self.listener.reset(2) # RECEIPT and possibly message
|
||||
self.conn.send("test", destination=d, receipt="send.receipt")
|
||||
self.assertFalse(self.listener.await(3),
|
||||
"UNSUBSCRIBE queue failed, still receiving messages")
|
||||
self.assertEquals(1, len(self.listener.receipts),
|
||||
"Expected a receipt on SEND")
|
||||
d = "/queue/unsub02"
|
||||
self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1)
|
||||
|
||||
def test_unsubscribe_exchange_id(self):
|
||||
''' Test UNSUBSCRIBE command with exchange by id'''
|
||||
self.unsub_test(self.subid_and_send("/exchange/amq.fanout", "exchid"))
|
||||
d = "/exchange/amq.fanout"
|
||||
self.unsub_test(d, self.sub_and_send(d, subid="exchid"))
|
||||
|
||||
def test_unsubscribe_exchange_id_with_receipt(self):
|
||||
''' Test receipted UNSUBSCRIBE command with exchange by id'''
|
||||
d = "/exchange/amq.fanout"
|
||||
|
||||
# subscribe and send message
|
||||
self.listener.reset()
|
||||
self.conn.subscribe(destination=d, id="exchid")
|
||||
self.conn.send("test", destination=d)
|
||||
self.assertTrue(self.listener.await())
|
||||
self.assertEquals(1, len(self.listener.messages))
|
||||
self.assertEquals(0, len(self.listener.errors))
|
||||
|
||||
# unsubscribe and send with RECEIPTs
|
||||
self.listener.reset()
|
||||
self.conn.unsubscribe(id="exchid", receipt="unsub.receipt")
|
||||
self.assertTrue(self.listener.await(1),
|
||||
"No RECEIPT received on UNSUBSCRIBE")
|
||||
self.assertEquals(1, len(self.listener.receipts),
|
||||
"Expected a receipt on UNSUBSCRIBE")
|
||||
|
||||
self.listener.reset(2) # RECEIPT and possibly message
|
||||
self.conn.send("test", destination=d, receipt="send.receipt")
|
||||
self.assertFalse(self.listener.await(3),
|
||||
"UNSUBSCRIBE exchange failed, still receiving messages")
|
||||
self.assertEquals(1, len(self.listener.receipts),
|
||||
"Expected a receipt on SEND")
|
||||
self.unsub_test(d, self.sub_and_send(d, subid="exchid", receipt="unsub.rct"), numRcts=1)
|
||||
|
||||
def test_unsubscribe_queue_id(self):
|
||||
''' Test UNSUBSCRIBE command with queue by id'''
|
||||
self.unsub_test(self.subid_and_send("/queue/unsub02", "queid"))
|
||||
d = "/queue/unsub03"
|
||||
self.unsub_test(d, self.sub_and_send(d, subid="queid"))
|
||||
|
||||
def test_unsubscribe_queue_id_with_receipt(self):
|
||||
''' Test receipted UNSUBSCRIBE command with queue by id'''
|
||||
d = "/queue/unsub04"
|
||||
|
||||
# subscribe and send message
|
||||
self.listener.reset()
|
||||
self.conn.subscribe(destination=d, id="queid")
|
||||
self.conn.send("test", destination=d)
|
||||
self.assertTrue(self.listener.await())
|
||||
self.assertEquals(1, len(self.listener.messages))
|
||||
self.assertEquals(0, len(self.listener.errors))
|
||||
|
||||
# unsubscribe and send with RECEIPTs
|
||||
self.listener.reset()
|
||||
self.conn.unsubscribe(id="queid", receipt="unsub.receipt")
|
||||
self.assertTrue(self.listener.await(1),
|
||||
"No RECEIPT received on UNSUBSCRIBE")
|
||||
self.assertEquals(1, len(self.listener.receipts),
|
||||
"Expected a receipt on UNSUBSCRIBE")
|
||||
|
||||
self.listener.reset(2) # RECEIPT and possibly message
|
||||
self.conn.send("test", destination=d, receipt="send.receipt")
|
||||
self.assertFalse(self.listener.await(3),
|
||||
"UNSUBSCRIBE queue by id failed, still receiving messages")
|
||||
self.assertEquals(1, len(self.listener.receipts),
|
||||
"Expected a receipt on SEND")
|
||||
self.unsub_test(d, self.sub_and_send(d, subid="queid", receipt="unsub.rct"), numRcts=1)
|
||||
|
||||
def test_disconnect(self):
|
||||
''' Run DISCONNECT command '''
|
||||
self.conn.disconnect()
|
||||
self.assertFalse(self.conn.is_connected())
|
||||
|
||||
def unsub_test(self, verbs):
|
||||
def unsub_test(self, dest, verbs, numRcts=0):
|
||||
def afterfun():
|
||||
self.conn.send("after-test", destination=dest)
|
||||
subverb, unsubverb = verbs
|
||||
self.assertListenerAfter(subverb,
|
||||
numMsgs=1, errMsg="FAILED to subscribe and send")
|
||||
self.assertListenerAfter(unsubverb,
|
||||
self.assertListenerAfter(subverb, numMsgs=1,
|
||||
errMsg="FAILED to subscribe and send")
|
||||
self.assertListenerAfter(unsubverb, numRcts=numRcts,
|
||||
errMsg="Incorrect responses from UNSUBSCRIBE")
|
||||
self.assertListenerAfter(afterfun,
|
||||
errMsg="Still receiving messages")
|
||||
|
||||
def subid_and_send(self, dest, subid):
|
||||
def sub_and_send(self, dest, subid="", receipt=""):
|
||||
def subfun():
|
||||
self.conn.subscribe(destination=dest, id=subid)
|
||||
if subid=="":
|
||||
self.conn.subscribe(destination=dest)
|
||||
else:
|
||||
self.conn.subscribe(destination=dest, id=subid)
|
||||
self.conn.send("test", destination=dest)
|
||||
def unsubfun():
|
||||
self.conn.unsubscribe(id=subid)
|
||||
return subfun, unsubfun
|
||||
|
||||
def sub_and_send(self, dest):
|
||||
def subfun():
|
||||
self.conn.subscribe(destination=dest)
|
||||
self.conn.send("test", destination=dest)
|
||||
def unsubfun():
|
||||
self.conn.unsubscribe(destination=dest)
|
||||
if subid=="" and receipt=="":
|
||||
self.conn.unsubscribe(destination=dest)
|
||||
elif receipt=="":
|
||||
self.conn.unsubscribe(id=subid)
|
||||
elif subid=="":
|
||||
self.conn.unsubscribe(destination=dest, receipt=receipt)
|
||||
else:
|
||||
self.conn.unsubscribe(id=subid, receipt=receipt)
|
||||
return subfun, unsubfun
|
||||
|
|
|
|||
Loading…
Reference in New Issue