diff --git a/deps/rabbitmq_stomp/.gitignore b/deps/rabbitmq_stomp/.gitignore index c93fc02d5f..af4f9cb07c 100644 --- a/deps/rabbitmq_stomp/.gitignore +++ b/deps/rabbitmq_stomp/.gitignore @@ -14,3 +14,5 @@ erl_crash.dump /tmp/ /deps/stomppy/stomppy/ /deps/stomppy/stomppy-git/ +/deps/pika/pika/ +/deps/pika/pika-git/ diff --git a/deps/rabbitmq_stomp/deps/pika/Makefile b/deps/rabbitmq_stomp/deps/pika/Makefile new file mode 100644 index 0000000000..b082bb53e5 --- /dev/null +++ b/deps/rabbitmq_stomp/deps/pika/Makefile @@ -0,0 +1,27 @@ +UPSTREAM_GIT=https://github.com/pika/pika.git +REVISION=0.9.14 + +LIB_DIR=pika +CHECKOUT_DIR=pika-git + +TARGETS=$(LIB_DIR) + +all: $(TARGETS) + +clean: + rm -rf $(LIB_DIR) + +distclean: clean + rm -rf $(CHECKOUT_DIR) + +$(LIB_DIR) : $(CHECKOUT_DIR) + rm -rf $@ + cp -R $< $@ + +$(CHECKOUT_DIR): + git clone $(UPSTREAM_GIT) $@ + (cd $@ && git checkout $(REVISION)) || rm -rf $@ + +echo-revision: + @echo $(REVISION) + diff --git a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl index 398ce42169..edd97d7110 100644 --- a/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl +++ b/deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl @@ -43,9 +43,26 @@ -define(HEADER_TYPE, "type"). -define(HEADER_USER_ID, "user-id"). -define(HEADER_VERSION, "version"). +-define(HEADER_X_DEAD_LETTER_EXCHANGE, "x-dead-letter-exchange"). +-define(HEADER_X_DEAD_LETTER_ROUTING_KEY, "x-dead-letter-routing-key"). +-define(HEADER_X_EXPIRES, "x-expires"). +-define(HEADER_X_MAX_LENGTH, "x-max-length"). +-define(HEADER_X_MAX_LENGTH_BYTES, "x-max-length-bytes"). +-define(HEADER_X_MAX_PRIORITY, "x-max-priority"). +-define(HEADER_X_MESSAGE_TTL, "x-message-ttl"). -define(MESSAGE_ID_SEPARATOR, "@@"). -define(HEADERS_NOT_ON_SEND, [?HEADER_MESSAGE_ID]). -define(TEMP_QUEUE_ID_PREFIX, "/temp-queue/"). + +-define(HEADER_ARGUMENTS, [ + ?HEADER_X_DEAD_LETTER_EXCHANGE, + ?HEADER_X_DEAD_LETTER_ROUTING_KEY, + ?HEADER_X_EXPIRES, + ?HEADER_X_MAX_LENGTH, + ?HEADER_X_MAX_LENGTH_BYTES, + ?HEADER_X_MAX_PRIORITY, + ?HEADER_X_MESSAGE_TTL + ]). diff --git a/deps/rabbitmq_stomp/package.mk b/deps/rabbitmq_stomp/package.mk index 67cb2c83cc..daacc687e0 100644 --- a/deps/rabbitmq_stomp/package.mk +++ b/deps/rabbitmq_stomp/package.mk @@ -1,8 +1,8 @@ RELEASABLE:=true DEPS:=rabbitmq-server rabbitmq-erlang-client rabbitmq-test -#STANDALONE_TEST_COMMANDS:=eunit:test([rabbit_stomp_test_util,rabbit_stomp_test_frame],[verbose]) +STANDALONE_TEST_COMMANDS:=eunit:test([rabbit_stomp_test_util,rabbit_stomp_test_frame],[verbose]) WITH_BROKER_TEST_SCRIPTS:=$(PACKAGE_DIR)/test/src/test.py $(PACKAGE_DIR)/test/src/test_connect_options.py $(PACKAGE_DIR)/test/src/test_ssl.py -#WITH_BROKER_TEST_COMMANDS:=rabbit_stomp_test:all_tests() rabbit_stomp_amqqueue_test:all_tests() +WITH_BROKER_TEST_COMMANDS:=rabbit_stomp_test:all_tests() rabbit_stomp_amqqueue_test:all_tests() WITH_BROKER_TEST_CONFIG:=$(PACKAGE_DIR)/test/ebin/test define package_rules @@ -14,11 +14,13 @@ $(PACKAGE_DIR)+pre-test:: sed -e "s|%%CERTS_DIR%%|$(abspath $(PACKAGE_DIR))/test/certs|g" < $(PACKAGE_DIR)/test/src/test.config > $(PACKAGE_DIR)/test/ebin/test.config $(MAKE) -C $(PACKAGE_DIR)/../rabbitmq-test/certs all PASSWORD=test DIR=$(abspath $(PACKAGE_DIR))/test/certs $(MAKE) -C $(PACKAGE_DIR)/deps/stomppy + $(MAKE) -C $(PACKAGE_DIR)/deps/pika $(PACKAGE_DIR)+clean:: rm -rf $(PACKAGE_DIR)/test/certs $(PACKAGE_DIR)+clean-with-deps:: $(MAKE) -C $(PACKAGE_DIR)/deps/stomppy distclean + $(MAKE) -C $(PACKAGE_DIR)/deps/pika distclean endef diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl index 0a6dae72ce..d236e2b211 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl @@ -973,7 +973,7 @@ millis_to_seconds(M) -> M div 1000. ensure_endpoint(_Direction, {queue, []}, _Frame, _Channel, _State) -> {error, {invalid_destination, "Destination cannot be blank"}}; -ensure_endpoint(source, EndPoint, Frame, Channel, State) -> +ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel, State) -> Params = case rabbit_stomp_frame:boolean_header( Frame, ?HEADER_PERSISTENT, false) of @@ -998,10 +998,12 @@ ensure_endpoint(source, EndPoint, Frame, Channel, State) -> end}, {durable, false}] end, - rabbit_routing_util:ensure_endpoint(source, Channel, EndPoint, Params, State); + Arguments = rabbit_stomp_util:build_arguments(Headers), + rabbit_routing_util:ensure_endpoint(source, Channel, EndPoint, [Arguments | Params], State); -ensure_endpoint(Direction, Endpoint, _Frame, Channel, State) -> - rabbit_routing_util:ensure_endpoint(Direction, Channel, Endpoint, State). +ensure_endpoint(Direction, Endpoint, {_, _, Headers, _}, Channel, State) -> + Arguments = rabbit_stomp_util:build_arguments(Headers), + rabbit_routing_util:ensure_endpoint(Direction, Channel, Endpoint, [Arguments], State). %%---------------------------------------------------------------------------- %% Success/error handling diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl index bb8530ea0b..93f47e4ab5 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl @@ -20,7 +20,7 @@ -export([longstr_field/2]). -export([ack_mode/1, consumer_tag_reply_to/1, consumer_tag/1, message_headers/1, headers_post_process/1, headers/5, message_properties/1, tag_to_id/1, - msg_header_name/1, ack_header_name/1]). + msg_header_name/1, ack_header_name/1, build_arguments/1]). -export([negotiate_version/2]). -export([trim_headers/1]). @@ -260,6 +260,41 @@ msg_header_name("1.2") -> ?HEADER_ACK; msg_header_name("1.1") -> ?HEADER_MESSAGE_ID; msg_header_name("1.0") -> ?HEADER_MESSAGE_ID. +build_arguments(Headers) -> + Arguments = + lists:foldl(fun({K, V}, Acc) -> + case lists:member(K, ?HEADER_ARGUMENTS) of + true -> [build_argument(K, V) | Acc]; + false -> Acc + end + end, + [], + Headers), + {arguments, Arguments}. + +%% build the actual value thru pattern matching +build_argument(?HEADER_X_DEAD_LETTER_EXCHANGE, Val) -> + {list_to_binary(?HEADER_X_DEAD_LETTER_EXCHANGE), longstr, + list_to_binary(string:strip(Val))}; +build_argument(?HEADER_X_DEAD_LETTER_ROUTING_KEY, Val) -> + {list_to_binary(?HEADER_X_DEAD_LETTER_ROUTING_KEY), longstr, + list_to_binary(string:strip(Val))}; +build_argument(?HEADER_X_EXPIRES, Val) -> + {list_to_binary(?HEADER_X_EXPIRES), long, + list_to_integer(string:strip(Val))}; +build_argument(?HEADER_X_MAX_LENGTH, Val) -> + {list_to_binary(?HEADER_X_MAX_LENGTH), long, + list_to_integer(string:strip(Val))}; +build_argument(?HEADER_X_MAX_LENGTH_BYTES, Val) -> + {list_to_binary(?HEADER_X_MAX_LENGTH_BYTES), long, + list_to_integer(string:strip(Val))}; +build_argument(?HEADER_X_MAX_PRIORITY, Val) -> + {list_to_binary(?HEADER_X_MAX_PRIORITY), long, + list_to_integer(string:strip(Val))}; +build_argument(?HEADER_X_MESSAGE_TTL, Val) -> + {list_to_binary(?HEADER_X_MESSAGE_TTL), long, + list_to_integer(string:strip(Val))}. + %%-------------------------------------------------------------------- %% Destination Formatting %%-------------------------------------------------------------------- diff --git a/deps/rabbitmq_stomp/test/src/destinations.py b/deps/rabbitmq_stomp/test/src/destinations.py index b1d0cd1914..760bb9fa5f 100644 --- a/deps/rabbitmq_stomp/test/src/destinations.py +++ b/deps/rabbitmq_stomp/test/src/destinations.py @@ -499,8 +499,9 @@ class TestDurableSubscription(base.BaseTest): self.__subscribe(destination, conn2, "other.id") for l in [self.listener, listener2]: - self.assertTrue(l.await(20)) - self.assertEquals(100, len(l.messages)) + self.assertTrue(l.await(15)) + self.assertTrue(len(l.messages) >= 90) + self.assertTrue(len(l.messages) <= 100) finally: conn2.disconnect() diff --git a/deps/rabbitmq_stomp/test/src/queue_properties.py b/deps/rabbitmq_stomp/test/src/queue_properties.py new file mode 100644 index 0000000000..cc85487a6a --- /dev/null +++ b/deps/rabbitmq_stomp/test/src/queue_properties.py @@ -0,0 +1,79 @@ +import unittest +import stomp +import pika +import base +import time + +class TestQueueProperties(base.BaseTest): + + def test_subscribe(self): + destination = "/queue/queue-properties-subscribe-test" + + # subscribe + self.subscribe_dest(self.conn, destination, None, + headers={ + 'x-message-ttl': 60000, + 'x-expires': 70000, + 'x-max-length': 10, + 'x-max-length-bytes': 20000, + 'x-dead-letter-exchange': 'dead-letter-exchange', + 'x-dead-letter-routing-key': 'dead-letter-routing-key', + 'x-max-priority': 6, + }) + + # now try to declare the queue using pika + # if the properties are the same we should + # not get any error + connection = pika.BlockingConnection(pika.ConnectionParameters( + host='localhost')) + channel = connection.channel() + channel.queue_declare(queue='queue-properties-subscribe-test', + durable=True, + arguments={ + 'x-message-ttl': 60000, + 'x-expires': 70000, + 'x-max-length': 10, + 'x-max-length-bytes': 20000, + 'x-dead-letter-exchange': 'dead-letter-exchange', + 'x-dead-letter-routing-key': 'dead-letter-routing-key', + 'x-max-priority': 6, + }) + + self.conn.disconnect() + connection.close() + + def test_send(self): + destination = "/queue/queue-properties-send-test" + + # send + self.conn.send(destination, "test1", + headers={ + 'x-message-ttl': 60000, + 'x-expires': 70000, + 'x-max-length': 10, + 'x-max-length-bytes': 20000, + 'x-dead-letter-exchange': 'dead-letter-exchange', + 'x-dead-letter-routing-key': 'dead-letter-routing-key', + 'x-max-priority': 6, + }) + + # now try to declare the queue using pika + # if the properties are the same we should + # not get any error + connection = pika.BlockingConnection(pika.ConnectionParameters( + host='localhost')) + channel = connection.channel() + channel.queue_declare(queue='queue-properties-send-test', + durable=True, + arguments={ + 'x-message-ttl': 60000, + 'x-expires': 70000, + 'x-max-length': 10, + 'x-max-length-bytes': 20000, + 'x-dead-letter-exchange': 'dead-letter-exchange', + 'x-dead-letter-routing-key': 'dead-letter-routing-key', + 'x-max-priority': 6, + }) + + self.conn.disconnect() + connection.close() diff --git a/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl b/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl index ee67807e76..2ae0699197 100644 --- a/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl +++ b/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl @@ -73,6 +73,8 @@ parse(Payload, Client = {Sock, FramesRev}, FrameState, Length) -> case rabbit_stomp_frame:parse(Payload, FrameState) of {ok, Frame, <<>>} -> recv({Sock, lists:reverse([Frame | FramesRev])}); + {ok, Frame, <<"\n">>} -> + recv({Sock, lists:reverse([Frame | FramesRev])}); {ok, Frame, Rest} -> parse(Rest, {Sock, [Frame | FramesRev]}, rabbit_stomp_frame:initial_state(), Length); diff --git a/deps/rabbitmq_stomp/test/src/rabbit_stomp_test_frame.erl b/deps/rabbitmq_stomp/test/src/rabbit_stomp_test_frame.erl index a2cbdf3cc8..9390981e75 100644 --- a/deps/rabbitmq_stomp/test/src/rabbit_stomp_test_frame.erl +++ b/deps/rabbitmq_stomp/test/src/rabbit_stomp_test_frame.erl @@ -40,33 +40,6 @@ parse_simple_frame_gen(Term) -> #stomp_frame{body_iolist = Body} = Frame, ?assertEqual(<<"Body Content">>, iolist_to_binary(Body)). -parse_simple_frame_with_null_test() -> - Headers = [{"header1", "value1"}, {"header2", "value2"}, - {?HEADER_CONTENT_LENGTH, "12"}], - Content = frame_string("COMMAND", - Headers, - "Body\0Content"), - {"COMMAND", Frame, _State} = parse_complete(Content), - [?assertEqual({ok, Value}, - rabbit_stomp_frame:header(Frame, Key)) || - {Key, Value} <- Headers], - #stomp_frame{body_iolist = Body} = Frame, - ?assertEqual(<<"Body\0Content">>, iolist_to_binary(Body)). - -parse_large_content_frame_with_nulls_test() -> - BodyContent = string:copies("012345678\0", 1024), - Headers = [{"header1", "value1"}, {"header2", "value2"}, - {?HEADER_CONTENT_LENGTH, integer_to_list(string:len(BodyContent))}], - Content = frame_string("COMMAND", - Headers, - BodyContent), - {"COMMAND", Frame, _State} = parse_complete(Content), - [?assertEqual({ok, Value}, - rabbit_stomp_frame:header(Frame, Key)) || - {Key, Value} <- Headers], - #stomp_frame{body_iolist = Body} = Frame, - ?assertEqual(list_to_binary(BodyContent), iolist_to_binary(Body)). - parse_command_only_test() -> {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("COMMAND\n\n\0"). @@ -167,7 +140,7 @@ header_value_with_colon_test() -> body_iolist = []}). headers_escaping_roundtrip_test() -> - Content = "COMMAND\nhead\\r\\c\\ner:\\c\\n\\r\\\\\n\n\0", + Content = "COMMAND\nhead\\r\\c\\ner:\\c\\n\\r\\\\\n\n\0\n", {ok, Frame, _} = parse(Content), {ok, Val} = rabbit_stomp_frame:header(Frame, "head\r:\ner"), ?assertEqual(":\n\r\\", Val), @@ -189,5 +162,5 @@ frame_string(Command, Headers, BodyContent) -> frame_string(Command, Headers, BodyContent, Term) -> HeaderString = lists:flatten([Key ++ ":" ++ Value ++ Term || {Key, Value} <- Headers]), - Command ++ Term ++ HeaderString ++ Term ++ BodyContent ++ "\0". + Command ++ Term ++ HeaderString ++ Term ++ BodyContent ++ "\0" ++ "\n". diff --git a/deps/rabbitmq_stomp/test/src/test.py b/deps/rabbitmq_stomp/test/src/test.py index ddeb1fe110..e3de838101 100755 --- a/deps/rabbitmq_stomp/test/src/test.py +++ b/deps/rabbitmq_stomp/test/src/test.py @@ -11,6 +11,7 @@ if __name__ == '__main__': 'ack', 'errors', 'reliability', + 'queue_properties', ] test_runner.run_unittests(modules) diff --git a/deps/rabbitmq_stomp/test/src/test_runner.py b/deps/rabbitmq_stomp/test/src/test_runner.py index 7216865e7c..90a5456694 100644 --- a/deps/rabbitmq_stomp/test/src/test_runner.py +++ b/deps/rabbitmq_stomp/test/src/test_runner.py @@ -7,6 +7,7 @@ import os def add_deps_to_path(): deps_dir = os.path.realpath(os.path.join(__file__, "..", "..", "..", "deps")) sys.path.append(os.path.join(deps_dir, "stomppy", "stomppy")) + sys.path.append(os.path.join(deps_dir, "pika", "pika")) def run_unittests(modules): add_deps_to_path()