Merge branch 'stable'

This commit is contained in:
Michael Klishin 2015-08-20 17:56:35 +03:00
commit 5b8ba91bbb
12 changed files with 180 additions and 38 deletions

View File

@ -14,3 +14,5 @@ erl_crash.dump
/tmp/
/deps/stomppy/stomppy/
/deps/stomppy/stomppy-git/
/deps/pika/pika/
/deps/pika/pika-git/

27
deps/rabbitmq_stomp/deps/pika/Makefile vendored Normal file
View File

@ -0,0 +1,27 @@
UPSTREAM_GIT=https://github.com/pika/pika.git
REVISION=0.9.14
LIB_DIR=pika
CHECKOUT_DIR=pika-git
TARGETS=$(LIB_DIR)
all: $(TARGETS)
clean:
rm -rf $(LIB_DIR)
distclean: clean
rm -rf $(CHECKOUT_DIR)
$(LIB_DIR) : $(CHECKOUT_DIR)
rm -rf $@
cp -R $< $@
$(CHECKOUT_DIR):
git clone $(UPSTREAM_GIT) $@
(cd $@ && git checkout $(REVISION)) || rm -rf $@
echo-revision:
@echo $(REVISION)

View File

@ -43,9 +43,26 @@
-define(HEADER_TYPE, "type").
-define(HEADER_USER_ID, "user-id").
-define(HEADER_VERSION, "version").
-define(HEADER_X_DEAD_LETTER_EXCHANGE, "x-dead-letter-exchange").
-define(HEADER_X_DEAD_LETTER_ROUTING_KEY, "x-dead-letter-routing-key").
-define(HEADER_X_EXPIRES, "x-expires").
-define(HEADER_X_MAX_LENGTH, "x-max-length").
-define(HEADER_X_MAX_LENGTH_BYTES, "x-max-length-bytes").
-define(HEADER_X_MAX_PRIORITY, "x-max-priority").
-define(HEADER_X_MESSAGE_TTL, "x-message-ttl").
-define(MESSAGE_ID_SEPARATOR, "@@").
-define(HEADERS_NOT_ON_SEND, [?HEADER_MESSAGE_ID]).
-define(TEMP_QUEUE_ID_PREFIX, "/temp-queue/").
-define(HEADER_ARGUMENTS, [
?HEADER_X_DEAD_LETTER_EXCHANGE,
?HEADER_X_DEAD_LETTER_ROUTING_KEY,
?HEADER_X_EXPIRES,
?HEADER_X_MAX_LENGTH,
?HEADER_X_MAX_LENGTH_BYTES,
?HEADER_X_MAX_PRIORITY,
?HEADER_X_MESSAGE_TTL
]).

View File

@ -1,8 +1,8 @@
RELEASABLE:=true
DEPS:=rabbitmq-server rabbitmq-erlang-client rabbitmq-test
#STANDALONE_TEST_COMMANDS:=eunit:test([rabbit_stomp_test_util,rabbit_stomp_test_frame],[verbose])
STANDALONE_TEST_COMMANDS:=eunit:test([rabbit_stomp_test_util,rabbit_stomp_test_frame],[verbose])
WITH_BROKER_TEST_SCRIPTS:=$(PACKAGE_DIR)/test/src/test.py $(PACKAGE_DIR)/test/src/test_connect_options.py $(PACKAGE_DIR)/test/src/test_ssl.py
#WITH_BROKER_TEST_COMMANDS:=rabbit_stomp_test:all_tests() rabbit_stomp_amqqueue_test:all_tests()
WITH_BROKER_TEST_COMMANDS:=rabbit_stomp_test:all_tests() rabbit_stomp_amqqueue_test:all_tests()
WITH_BROKER_TEST_CONFIG:=$(PACKAGE_DIR)/test/ebin/test
define package_rules
@ -14,11 +14,13 @@ $(PACKAGE_DIR)+pre-test::
sed -e "s|%%CERTS_DIR%%|$(abspath $(PACKAGE_DIR))/test/certs|g" < $(PACKAGE_DIR)/test/src/test.config > $(PACKAGE_DIR)/test/ebin/test.config
$(MAKE) -C $(PACKAGE_DIR)/../rabbitmq-test/certs all PASSWORD=test DIR=$(abspath $(PACKAGE_DIR))/test/certs
$(MAKE) -C $(PACKAGE_DIR)/deps/stomppy
$(MAKE) -C $(PACKAGE_DIR)/deps/pika
$(PACKAGE_DIR)+clean::
rm -rf $(PACKAGE_DIR)/test/certs
$(PACKAGE_DIR)+clean-with-deps::
$(MAKE) -C $(PACKAGE_DIR)/deps/stomppy distclean
$(MAKE) -C $(PACKAGE_DIR)/deps/pika distclean
endef

View File

@ -973,7 +973,7 @@ millis_to_seconds(M) -> M div 1000.
ensure_endpoint(_Direction, {queue, []}, _Frame, _Channel, _State) ->
{error, {invalid_destination, "Destination cannot be blank"}};
ensure_endpoint(source, EndPoint, Frame, Channel, State) ->
ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel, State) ->
Params =
case rabbit_stomp_frame:boolean_header(
Frame, ?HEADER_PERSISTENT, false) of
@ -998,10 +998,12 @@ ensure_endpoint(source, EndPoint, Frame, Channel, State) ->
end},
{durable, false}]
end,
rabbit_routing_util:ensure_endpoint(source, Channel, EndPoint, Params, State);
Arguments = rabbit_stomp_util:build_arguments(Headers),
rabbit_routing_util:ensure_endpoint(source, Channel, EndPoint, [Arguments | Params], State);
ensure_endpoint(Direction, Endpoint, _Frame, Channel, State) ->
rabbit_routing_util:ensure_endpoint(Direction, Channel, Endpoint, State).
ensure_endpoint(Direction, Endpoint, {_, _, Headers, _}, Channel, State) ->
Arguments = rabbit_stomp_util:build_arguments(Headers),
rabbit_routing_util:ensure_endpoint(Direction, Channel, Endpoint, [Arguments], State).
%%----------------------------------------------------------------------------
%% Success/error handling

View File

@ -20,7 +20,7 @@
-export([longstr_field/2]).
-export([ack_mode/1, consumer_tag_reply_to/1, consumer_tag/1, message_headers/1,
headers_post_process/1, headers/5, message_properties/1, tag_to_id/1,
msg_header_name/1, ack_header_name/1]).
msg_header_name/1, ack_header_name/1, build_arguments/1]).
-export([negotiate_version/2]).
-export([trim_headers/1]).
@ -260,6 +260,41 @@ msg_header_name("1.2") -> ?HEADER_ACK;
msg_header_name("1.1") -> ?HEADER_MESSAGE_ID;
msg_header_name("1.0") -> ?HEADER_MESSAGE_ID.
build_arguments(Headers) ->
Arguments =
lists:foldl(fun({K, V}, Acc) ->
case lists:member(K, ?HEADER_ARGUMENTS) of
true -> [build_argument(K, V) | Acc];
false -> Acc
end
end,
[],
Headers),
{arguments, Arguments}.
%% build the actual value thru pattern matching
build_argument(?HEADER_X_DEAD_LETTER_EXCHANGE, Val) ->
{list_to_binary(?HEADER_X_DEAD_LETTER_EXCHANGE), longstr,
list_to_binary(string:strip(Val))};
build_argument(?HEADER_X_DEAD_LETTER_ROUTING_KEY, Val) ->
{list_to_binary(?HEADER_X_DEAD_LETTER_ROUTING_KEY), longstr,
list_to_binary(string:strip(Val))};
build_argument(?HEADER_X_EXPIRES, Val) ->
{list_to_binary(?HEADER_X_EXPIRES), long,
list_to_integer(string:strip(Val))};
build_argument(?HEADER_X_MAX_LENGTH, Val) ->
{list_to_binary(?HEADER_X_MAX_LENGTH), long,
list_to_integer(string:strip(Val))};
build_argument(?HEADER_X_MAX_LENGTH_BYTES, Val) ->
{list_to_binary(?HEADER_X_MAX_LENGTH_BYTES), long,
list_to_integer(string:strip(Val))};
build_argument(?HEADER_X_MAX_PRIORITY, Val) ->
{list_to_binary(?HEADER_X_MAX_PRIORITY), long,
list_to_integer(string:strip(Val))};
build_argument(?HEADER_X_MESSAGE_TTL, Val) ->
{list_to_binary(?HEADER_X_MESSAGE_TTL), long,
list_to_integer(string:strip(Val))}.
%%--------------------------------------------------------------------
%% Destination Formatting
%%--------------------------------------------------------------------

View File

@ -499,8 +499,9 @@ class TestDurableSubscription(base.BaseTest):
self.__subscribe(destination, conn2, "other.id")
for l in [self.listener, listener2]:
self.assertTrue(l.await(20))
self.assertEquals(100, len(l.messages))
self.assertTrue(l.await(15))
self.assertTrue(len(l.messages) >= 90)
self.assertTrue(len(l.messages) <= 100)
finally:
conn2.disconnect()

View File

@ -0,0 +1,79 @@
import unittest
import stomp
import pika
import base
import time
class TestQueueProperties(base.BaseTest):
def test_subscribe(self):
destination = "/queue/queue-properties-subscribe-test"
# subscribe
self.subscribe_dest(self.conn, destination, None,
headers={
'x-message-ttl': 60000,
'x-expires': 70000,
'x-max-length': 10,
'x-max-length-bytes': 20000,
'x-dead-letter-exchange': 'dead-letter-exchange',
'x-dead-letter-routing-key': 'dead-letter-routing-key',
'x-max-priority': 6,
})
# now try to declare the queue using pika
# if the properties are the same we should
# not get any error
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='queue-properties-subscribe-test',
durable=True,
arguments={
'x-message-ttl': 60000,
'x-expires': 70000,
'x-max-length': 10,
'x-max-length-bytes': 20000,
'x-dead-letter-exchange': 'dead-letter-exchange',
'x-dead-letter-routing-key': 'dead-letter-routing-key',
'x-max-priority': 6,
})
self.conn.disconnect()
connection.close()
def test_send(self):
destination = "/queue/queue-properties-send-test"
# send
self.conn.send(destination, "test1",
headers={
'x-message-ttl': 60000,
'x-expires': 70000,
'x-max-length': 10,
'x-max-length-bytes': 20000,
'x-dead-letter-exchange': 'dead-letter-exchange',
'x-dead-letter-routing-key': 'dead-letter-routing-key',
'x-max-priority': 6,
})
# now try to declare the queue using pika
# if the properties are the same we should
# not get any error
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='queue-properties-send-test',
durable=True,
arguments={
'x-message-ttl': 60000,
'x-expires': 70000,
'x-max-length': 10,
'x-max-length-bytes': 20000,
'x-dead-letter-exchange': 'dead-letter-exchange',
'x-dead-letter-routing-key': 'dead-letter-routing-key',
'x-max-priority': 6,
})
self.conn.disconnect()
connection.close()

View File

@ -73,6 +73,8 @@ parse(Payload, Client = {Sock, FramesRev}, FrameState, Length) ->
case rabbit_stomp_frame:parse(Payload, FrameState) of
{ok, Frame, <<>>} ->
recv({Sock, lists:reverse([Frame | FramesRev])});
{ok, Frame, <<"\n">>} ->
recv({Sock, lists:reverse([Frame | FramesRev])});
{ok, Frame, Rest} ->
parse(Rest, {Sock, [Frame | FramesRev]},
rabbit_stomp_frame:initial_state(), Length);

View File

@ -40,33 +40,6 @@ parse_simple_frame_gen(Term) ->
#stomp_frame{body_iolist = Body} = Frame,
?assertEqual(<<"Body Content">>, iolist_to_binary(Body)).
parse_simple_frame_with_null_test() ->
Headers = [{"header1", "value1"}, {"header2", "value2"},
{?HEADER_CONTENT_LENGTH, "12"}],
Content = frame_string("COMMAND",
Headers,
"Body\0Content"),
{"COMMAND", Frame, _State} = parse_complete(Content),
[?assertEqual({ok, Value},
rabbit_stomp_frame:header(Frame, Key)) ||
{Key, Value} <- Headers],
#stomp_frame{body_iolist = Body} = Frame,
?assertEqual(<<"Body\0Content">>, iolist_to_binary(Body)).
parse_large_content_frame_with_nulls_test() ->
BodyContent = string:copies("012345678\0", 1024),
Headers = [{"header1", "value1"}, {"header2", "value2"},
{?HEADER_CONTENT_LENGTH, integer_to_list(string:len(BodyContent))}],
Content = frame_string("COMMAND",
Headers,
BodyContent),
{"COMMAND", Frame, _State} = parse_complete(Content),
[?assertEqual({ok, Value},
rabbit_stomp_frame:header(Frame, Key)) ||
{Key, Value} <- Headers],
#stomp_frame{body_iolist = Body} = Frame,
?assertEqual(list_to_binary(BodyContent), iolist_to_binary(Body)).
parse_command_only_test() ->
{ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("COMMAND\n\n\0").
@ -167,7 +140,7 @@ header_value_with_colon_test() ->
body_iolist = []}).
headers_escaping_roundtrip_test() ->
Content = "COMMAND\nhead\\r\\c\\ner:\\c\\n\\r\\\\\n\n\0",
Content = "COMMAND\nhead\\r\\c\\ner:\\c\\n\\r\\\\\n\n\0\n",
{ok, Frame, _} = parse(Content),
{ok, Val} = rabbit_stomp_frame:header(Frame, "head\r:\ner"),
?assertEqual(":\n\r\\", Val),
@ -189,5 +162,5 @@ frame_string(Command, Headers, BodyContent) ->
frame_string(Command, Headers, BodyContent, Term) ->
HeaderString =
lists:flatten([Key ++ ":" ++ Value ++ Term || {Key, Value} <- Headers]),
Command ++ Term ++ HeaderString ++ Term ++ BodyContent ++ "\0".
Command ++ Term ++ HeaderString ++ Term ++ BodyContent ++ "\0" ++ "\n".

View File

@ -11,6 +11,7 @@ if __name__ == '__main__':
'ack',
'errors',
'reliability',
'queue_properties',
]
test_runner.run_unittests(modules)

View File

@ -7,6 +7,7 @@ import os
def add_deps_to_path():
deps_dir = os.path.realpath(os.path.join(__file__, "..", "..", "..", "deps"))
sys.path.append(os.path.join(deps_dir, "stomppy", "stomppy"))
sys.path.append(os.path.join(deps_dir, "pika", "pika"))
def run_unittests(modules):
add_deps_to_path()