rabbitmq-server/deps/rabbit/test/temp/head_message_timestamp_test...

132 lines
5.1 KiB
Python
Executable File

#!/usr/bin/python
#
# Tests for the SLA patch which adds the head_message_timestamp queue stat.
# Uses both the management interface via rabbitmqadmin and the AMQP interface via Pika.
# There's no particular reason to have used rabbitmqadmin other than saving some bulk.
# Similarly, the separate declaration of exchanges and queues is just a preference
# following a typical enterprise policy where admin users create these resources.
from datetime import datetime
import json
import pika
import os
import sys
from time import clock, mktime, sleep
import unittest
# Uses the rabbitmqadmin script.
# To be imported this must be given a .py suffix and placed on the Python path
from rabbitmqadmin import *
TEXCH = 'head-message-timestamp-test'
TQUEUE = 'head-message-timestamp-test-queue'
TIMEOUT_SECS = 10
TIMESTAMP1 = mktime(datetime(2010,1,1,12,00,01).timetuple())
TIMESTAMP2 = mktime(datetime(2010,1,1,12,00,02).timetuple())
AMQP_PORT = 99
DELIVERY_MODE = 2
DURABLE = False
def log(msg):
print("\nINFO: " + msg)
class RabbitTestCase(unittest.TestCase):
def setUp(self):
parser.set_conflict_handler('resolve')
(options, args) = make_configuration()
AMQP_PORT = int(options.port) - 10000
self.mgmt = Management(options, args)
self.mgmt.put('/exchanges/%2f/' + TEXCH, '{"type" : "fanout", "durable":' + str(DURABLE).lower() + '}')
self.mgmt.put('/queues/%2f/' + TQUEUE, '{"auto_delete":false,"durable":' + str(DURABLE).lower() + ',"arguments":[]}')
self.mgmt.post('/bindings/%2f/e/' + TEXCH + '/q/' + TQUEUE, '{"routing_key": ".*", "arguments":[]}')
self.credentials = pika.PlainCredentials(options.username, options.password)
parameters = pika.ConnectionParameters(options.hostname, port=AMQP_PORT, credentials=self.credentials)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
def tearDown(self):
parser.set_conflict_handler('resolve')
(options, args) = make_configuration()
self.mgmt = Management(options, args)
self.mgmt.delete('/queues/%2f/' + TQUEUE)
self.mgmt.delete('/exchanges/%2f/' + TEXCH)
class RabbitSlaTestCase(RabbitTestCase):
def get_queue_stats(self, queue_name):
stats_str = self.mgmt.get('/queues/%2f/' + queue_name)
return json.loads(stats_str)
def get_head_message_timestamp(self, queue_name):
return self.get_queue_stats(queue_name)["head_message_timestamp"]
def send(self, message, timestamp=None):
self.channel.basic_publish(TEXCH, '', message,
pika.BasicProperties(content_type='text/plain',
delivery_mode=DELIVERY_MODE,
timestamp=timestamp))
log("Sent message with body: " + str(message))
def receive(self, queue):
method_frame, header_frame, body = self.channel.basic_get(queue = queue)
log("Received message with body: " + str(body))
return method_frame.delivery_tag, body
def ack(self, delivery_tag):
self.channel.basic_ack(delivery_tag)
def nack(self, delivery_tag):
self.channel.basic_nack(delivery_tag)
def wait_for_new_timestamp(self, queue, old_timestamp):
stats_wait_start = clock()
while ((clock() - stats_wait_start) < TIMEOUT_SECS and
self.get_head_message_timestamp(queue) == old_timestamp):
sleep(0.1)
log('Queue stats updated in ' + str(clock() - stats_wait_start) + ' secs.')
return self.get_head_message_timestamp(queue)
# TESTS
def test_no_timestamp_when_queue_is_empty(self):
assert self.get_head_message_timestamp(TQUEUE) == ''
def test_has_timestamp_when_first_msg_is_added(self):
self.send('Msg1', TIMESTAMP1)
stats_timestamp = self.wait_for_new_timestamp(TQUEUE, '')
assert stats_timestamp == TIMESTAMP1
def test_no_timestamp_when_last_msg_is_removed(self):
self.send('Msg1', TIMESTAMP1)
stats_timestamp = self.wait_for_new_timestamp(TQUEUE, '')
tag, body = self.receive(TQUEUE)
self.ack(tag)
stats_timestamp = self.wait_for_new_timestamp(TQUEUE, TIMESTAMP1)
assert stats_timestamp == ''
def test_timestamp_updated_when_msg_is_removed(self):
self.send('Msg1', TIMESTAMP1)
stats_timestamp = self.wait_for_new_timestamp(TQUEUE, '')
self.send('Msg2', TIMESTAMP2)
tag, body = self.receive(TQUEUE)
self.ack(tag)
stats_timestamp = self.wait_for_new_timestamp(TQUEUE, TIMESTAMP1)
assert stats_timestamp == TIMESTAMP2
def test_timestamp_not_updated_before_msg_is_acked(self):
self.send('Msg1', TIMESTAMP1)
stats_timestamp = self.wait_for_new_timestamp(TQUEUE, '')
tag, body = self.receive(TQUEUE)
sleep(1) # Allow time for update to appear if it was going to (it shouldn't)
assert self.get_head_message_timestamp(TQUEUE) == TIMESTAMP1
self.ack(tag)
if __name__ == '__main__':
unittest.main(verbosity = 2)