Merge branch 'stable'

This commit is contained in:
Michael Klishin 2015-09-16 19:00:57 +03:00
commit e04f811092
2 changed files with 72 additions and 35 deletions

View File

@ -592,7 +592,16 @@ do_subscribe(Destination, DestHdr, Frame,
_ -> amqp_channel:call(
Channel, #'basic.qos'{prefetch_count = Prefetch})
end,
ExchangeAndKey = rabbit_routing_util:parse_routing(Destination),
case dict:find(ConsumerTag, Subs) of
{ok, _} ->
Message = "Duplicated subscription identifier",
Detail = "A subscription identified by '~s' alredy exists.",
error(Message, Detail, [ConsumerTag], State),
send_error(Message, Detail, [ConsumerTag], State),
{stop, normal, close_connection(State)};
error ->
ExchangeAndKey =
rabbit_routing_util:parse_routing(Destination),
try
amqp_channel:subscribe(Channel,
#'basic.consume'{
@ -606,8 +615,8 @@ do_subscribe(Destination, DestHdr, Frame,
ok = rabbit_routing_util:ensure_binding(
Queue, ExchangeAndKey, Channel)
catch exit:Err ->
%% it's safe to delete this queue, it was server-named
%% and declared by us
%% it's safe to delete this queue, it
%% was server-named and declared by us
case Destination of
{exchange, _} ->
ok = maybe_clean_up_queue(Queue, State);
@ -626,7 +635,8 @@ do_subscribe(Destination, DestHdr, Frame,
multi_ack = IsMulti,
description = Description},
Subs),
route_state = RouteState1});
route_state = RouteState1})
end;
{error, _} = Err ->
Err
end.

View File

@ -3,6 +3,34 @@ import stomp
import base
import time
class TestErrorsAndCloseConnection(base.BaseTest):
def __test_duplicate_consumer_tag_with_headers(self, destination, headers):
self.subscribe_dest(self.conn, destination, None,
headers = headers)
self.subscribe_dest(self.conn, destination, None,
headers = headers)
self.assertTrue(self.listener.await())
self.assertEquals(1, len(self.listener.errors))
errorReceived = self.listener.errors[0]
self.assertEquals("Duplicated subscription identifier", errorReceived['headers']['message'])
self.assertEquals("A subscription identified by 'T_1' alredy exists.", errorReceived['message'])
time.sleep(2)
self.assertFalse(self.conn.is_connected())
def test_duplicate_consumer_tag_with_transient_destination(self):
destination = "/exchange/amq.direct/duplicate-consumer-tag-test1"
self.__test_duplicate_consumer_tag_with_headers(destination, {'id': 1})
def test_duplicate_consumer_tag_with_durable_destination(self):
destination = "/queue/duplicate-consumer-tag-test2"
self.__test_duplicate_consumer_tag_with_headers(destination, {'id': 1,
'persistent': True})
class TestErrors(base.BaseTest):
def test_invalid_queue_destination(self):
@ -64,4 +92,3 @@ class TestErrors(base.BaseTest):
self.assertEquals("'" + content + "' is not a valid " +
dtype + " destination\n",
err['message'])