Merge with default
This commit is contained in:
commit
8dd2b3e720
|
|
@ -2,7 +2,7 @@ UPSTREAM_HG=https://stomppy.googlecode.com/hg/
|
|||
REVISION=16a4000624a7
|
||||
|
||||
LIB_DIR=stomppy
|
||||
CHECKOUT_DIR=stomppy-git
|
||||
CHECKOUT_DIR=stomppy-hg
|
||||
|
||||
TARGETS=$(LIB_DIR)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,13 +1,52 @@
|
|||
diff -r 16a4000624a7 stomp/connect.py
|
||||
--- a/stomp/connect.py Sun May 02 18:15:34 2010 +0100
|
||||
+++ b/stomp/connect.py Thu Oct 07 11:04:59 2010 +0100
|
||||
@@ -449,6 +449,9 @@
|
||||
+++ b/stomp/connect.py Wed Nov 10 20:44:32 2010 +0000
|
||||
@@ -88,7 +88,9 @@
|
||||
ssl_key_file = None,
|
||||
ssl_cert_file = None,
|
||||
ssl_ca_certs = None,
|
||||
- ssl_cert_validator = None):
|
||||
+ ssl_cert_validator = None,
|
||||
+ version = None,
|
||||
+ heartbeat = None):
|
||||
"""
|
||||
Initialize and start this connection.
|
||||
|
||||
@@ -204,6 +206,12 @@
|
||||
if user is not None and passcode is not None:
|
||||
self.__connect_headers['login'] = user
|
||||
self.__connect_headers['passcode'] = passcode
|
||||
+
|
||||
+ if version is not None:
|
||||
+ self.__connect_headers['accept-version'] = version
|
||||
+
|
||||
+ if heartbeat is not None:
|
||||
+ self.__connect_headers['heartbeat'] = heartbeat
|
||||
|
||||
self.__socket = None
|
||||
self.__socket_semaphore = threading.BoundedSemaphore(1)
|
||||
@@ -449,6 +457,16 @@
|
||||
raise KeyError("Command %s requires header %r" % (command, required_header_key))
|
||||
self.__send_frame(command, headers, payload)
|
||||
|
||||
+ def send_frame(self, command, headers={}, payload=''):
|
||||
+ self.__send_frame(command, headers, payload)
|
||||
+
|
||||
+ def send_heartbeat(self):
|
||||
+ self.__socket_semaphore.acquire()
|
||||
+ try:
|
||||
+ self.__socket.sendall('\x00')
|
||||
+ finally:
|
||||
+ self.__socket_semaphore.release()
|
||||
+
|
||||
def __send_frame(self, command, headers={}, payload=''):
|
||||
"""
|
||||
Send a STOMP frame.
|
||||
@@ -532,7 +550,6 @@
|
||||
|
||||
while self.__running:
|
||||
frames = self.__read()
|
||||
-
|
||||
for frame in frames:
|
||||
(frame_type, headers, body) = utils.parse_frame(frame)
|
||||
log.debug("Received frame: result=%r, headers=%r, body=%r" % (frame_type, headers, body))
|
||||
|
|
|
|||
|
|
@ -39,7 +39,8 @@ start_link(Sock) ->
|
|||
{ok, ProcessorPid} =
|
||||
supervisor2:start_child(SupPid,
|
||||
{rabbit_stomp_processor,
|
||||
{rabbit_stomp_processor, start_link, [Sock]},
|
||||
{rabbit_stomp_processor, start_link,
|
||||
[Sock, rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
|
||||
intrinsic, ?MAX_WAIT, worker,
|
||||
[rabbit_stomp_processor]}),
|
||||
{ok, ReaderPid} =
|
||||
|
|
|
|||
|
|
@ -31,20 +31,27 @@
|
|||
-module(rabbit_stomp_processor).
|
||||
-behaviour(gen_server).
|
||||
|
||||
-export([start_link/1, process_frame/2]).
|
||||
-export([start_link/2, process_frame/2]).
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
code_change/3, terminate/2]).
|
||||
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
-include("rabbit_stomp_frame.hrl").
|
||||
|
||||
-record(state, {socket, session_id, channel, connection, subscriptions}).
|
||||
-record(state, {socket, session_id, channel,
|
||||
connection, subscriptions, version,
|
||||
start_heartbeat_fun}).
|
||||
|
||||
-record(subscription, {dest_hdr, channel, ack_mode, multi_ack}).
|
||||
|
||||
-define(SUPPORTED_VERSIONS, ["1.0", "1.1"]).
|
||||
-define(DEFAULT_QUEUE_PREFETCH, 1).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
%% Public API
|
||||
%%----------------------------------------------------------------------------
|
||||
start_link(Sock) ->
|
||||
gen_server:start_link(?MODULE, [Sock], []).
|
||||
start_link(Sock, StartHeartbeatFun) ->
|
||||
gen_server:start_link(?MODULE, [Sock,StartHeartbeatFun], []).
|
||||
|
||||
process_frame(Pid, Frame = #stomp_frame{command = Command}) ->
|
||||
gen_server:cast(Pid, {Command, Frame}).
|
||||
|
|
@ -53,32 +60,46 @@ process_frame(Pid, Frame = #stomp_frame{command = Command}) ->
|
|||
%% Basic gen_server callbacks
|
||||
%%----------------------------------------------------------------------------
|
||||
|
||||
init([Sock]) ->
|
||||
init([Sock, StartHeartbeatFun]) ->
|
||||
process_flag(trap_exit, true),
|
||||
{ok,
|
||||
#state {
|
||||
socket = Sock,
|
||||
session_id = none,
|
||||
channel = none,
|
||||
connection = none,
|
||||
subscriptions = dict:new()}
|
||||
socket = Sock,
|
||||
session_id = none,
|
||||
channel = none,
|
||||
connection = none,
|
||||
subscriptions = dict:new(),
|
||||
version = none,
|
||||
start_heartbeat_fun = StartHeartbeatFun}
|
||||
}.
|
||||
|
||||
terminate(_Reason, State) ->
|
||||
shutdown_channel_and_connection(State).
|
||||
|
||||
handle_cast({"CONNECT", Frame}, State = #state{channel = none}) ->
|
||||
{ok, DefaultVHost} = application:get_env(rabbit, default_vhost),
|
||||
process_request(
|
||||
fun(StateN) ->
|
||||
do_login(rabbit_stomp_frame:header(Frame, "login"),
|
||||
rabbit_stomp_frame:header(Frame, "passcode"),
|
||||
rabbit_stomp_frame:header(Frame, "virtual-host",
|
||||
binary_to_list(DefaultVHost)),
|
||||
StateN)
|
||||
end,
|
||||
fun(StateM) -> StateM end,
|
||||
State);
|
||||
case negotiate_version(Frame) of
|
||||
{ok, Version} ->
|
||||
{ok, DefaultVHost} = application:get_env(rabbit, default_vhost),
|
||||
process_request(
|
||||
fun(StateN) ->
|
||||
do_login(rabbit_stomp_frame:header(Frame, "login"),
|
||||
rabbit_stomp_frame:header(Frame, "passcode"),
|
||||
rabbit_stomp_frame:header(Frame, "host",
|
||||
binary_to_list(
|
||||
DefaultVHost)),
|
||||
rabbit_stomp_frame:header(Frame, "heartbeat",
|
||||
"0,0"),
|
||||
Version,
|
||||
StateN)
|
||||
end,
|
||||
fun(StateM) -> StateM end,
|
||||
State);
|
||||
{error, no_common_version} ->
|
||||
error("Version mismatch",
|
||||
"Supported versions are ~s\n",
|
||||
[string:join(?SUPPORTED_VERSIONS, ",")],
|
||||
State)
|
||||
end;
|
||||
|
||||
handle_cast(_Request, State = #state{channel = none}) ->
|
||||
error("Illegal command", "You must log in using CONNECT first\n", State);
|
||||
|
|
@ -91,7 +112,10 @@ handle_cast({Command, Frame}, State) ->
|
|||
fun(StateM) ->
|
||||
ensure_receipt(Frame, StateM)
|
||||
end,
|
||||
State).
|
||||
State);
|
||||
|
||||
handle_cast(client_timeout, State) ->
|
||||
{stop, client_timeout, State}.
|
||||
|
||||
handle_info(#'basic.consume_ok'{}, State) ->
|
||||
{noreply, State};
|
||||
|
|
@ -168,10 +192,12 @@ handle_frame("ACK", Frame, State = #state{session_id = SessionId,
|
|||
{ok, IdStr} ->
|
||||
case rabbit_stomp_util:parse_message_id(IdStr) of
|
||||
{ok, {ConsumerTag, SessionId, DeliveryTag}} ->
|
||||
{_DestHdr, SubChannel} = dict:fetch(ConsumerTag, Subs),
|
||||
#subscription{channel = SubChannel,
|
||||
multi_ack = IsMulti} =
|
||||
dict:fetch(ConsumerTag, Subs),
|
||||
|
||||
Method = #'basic.ack'{delivery_tag = DeliveryTag,
|
||||
multiple = false},
|
||||
multiple = IsMulti},
|
||||
|
||||
case transactional(Frame) of
|
||||
{yes, Transaction} ->
|
||||
|
|
@ -237,7 +263,7 @@ with_destination(Command, Frame, State, Fun) ->
|
|||
State)
|
||||
end.
|
||||
|
||||
do_login({ok, Login}, {ok, Passcode}, VirtualHost, State) ->
|
||||
do_login({ok, Login}, {ok, Passcode}, VirtualHost, Heartbeat, Version, State) ->
|
||||
{ok, Connection} = amqp_connection:start(
|
||||
direct, #amqp_params{
|
||||
username = list_to_binary(Login),
|
||||
|
|
@ -245,11 +271,18 @@ do_login({ok, Login}, {ok, Passcode}, VirtualHost, State) ->
|
|||
virtual_host = list_to_binary(VirtualHost)}),
|
||||
{ok, Channel} = amqp_connection:open_channel(Connection),
|
||||
SessionId = rabbit_guid:string_guid("session"),
|
||||
ok("CONNECTED",[{"session", SessionId}], "",
|
||||
State#state{session_id = SessionId,
|
||||
|
||||
{{SX, SY}, State1} = ensure_heartbeats(Heartbeat, State),
|
||||
ok("CONNECTED",
|
||||
[{"session", SessionId},
|
||||
{"heartbeat", io_lib:format("~B,~B", [SX, SY])},
|
||||
{"version", Version}],
|
||||
"",
|
||||
State1#state{session_id = SessionId,
|
||||
channel = Channel,
|
||||
connection = Connection});
|
||||
do_login(_, _, _, State) ->
|
||||
|
||||
do_login(_, _, _, _, _, State) ->
|
||||
error("Bad CONNECT", "Missing login or passcode header(s)\n", State).
|
||||
|
||||
do_subscribe(Destination, DestHdr, Frame,
|
||||
|
|
@ -257,19 +290,22 @@ do_subscribe(Destination, DestHdr, Frame,
|
|||
connection = Connection,
|
||||
channel = MainChannel}) ->
|
||||
|
||||
Channel = case Destination of
|
||||
{queue, _} ->
|
||||
Prefetch = rabbit_stomp_frame:integer_header(Frame, "prefetch-count",
|
||||
default_prefetch(Destination)),
|
||||
|
||||
Channel = case Prefetch of
|
||||
undefined ->
|
||||
MainChannel;
|
||||
_ ->
|
||||
{ok, Channel1} = amqp_connection:open_channel(Connection),
|
||||
amqp_channel:call(Channel1,
|
||||
#'basic.qos'{prefetch_size = 0,
|
||||
prefetch_count = 1,
|
||||
prefetch_count = Prefetch,
|
||||
global = false}),
|
||||
Channel1;
|
||||
_ ->
|
||||
MainChannel
|
||||
Channel1
|
||||
end,
|
||||
|
||||
AckMode = rabbit_stomp_util:ack_mode(Frame),
|
||||
{AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame),
|
||||
|
||||
{ok, Queue} = ensure_queue(subscribe, Destination, Channel),
|
||||
|
||||
|
|
@ -288,7 +324,12 @@ do_subscribe(Destination, DestHdr, Frame,
|
|||
ok = ensure_queue_binding(Queue, ExchangeAndKey, Channel),
|
||||
|
||||
ok(State#state{subscriptions =
|
||||
dict:store(ConsumerTag, {DestHdr, Channel}, Subs)}).
|
||||
dict:store(ConsumerTag,
|
||||
#subscription{dest_hdr = DestHdr,
|
||||
channel = Channel,
|
||||
ack_mode = AckMode,
|
||||
multi_ack = IsMulti},
|
||||
Subs)}).
|
||||
|
||||
do_send(Destination, _DestHdr,
|
||||
Frame = #stomp_frame{body_iolist = BodyFragments},
|
||||
|
|
@ -315,6 +356,13 @@ do_send(Destination, _DestHdr,
|
|||
ok(send_method(Method, Props, BodyFragments, State))
|
||||
end.
|
||||
|
||||
negotiate_version(Frame) ->
|
||||
ClientVers = re:split(
|
||||
rabbit_stomp_frame:header(Frame, "accept-version", "1.0"),
|
||||
",",
|
||||
[{return, list}]),
|
||||
rabbit_stomp_util:negotiate_version(ClientVers, ?SUPPORTED_VERSIONS).
|
||||
|
||||
ensure_receipt(Frame, State) ->
|
||||
case rabbit_stomp_frame:header(Frame, "receipt") of
|
||||
{ok, Id} -> send_frame("RECEIPT", [{"receipt-id", Id}], "", State);
|
||||
|
|
@ -325,7 +373,7 @@ send_delivery(Delivery = #'basic.deliver'{consumer_tag = ConsumerTag},
|
|||
Properties, Body,
|
||||
State = #state{session_id = SessionId,
|
||||
subscriptions = Subs}) ->
|
||||
{Destination, _SubChannel} = dict:fetch(ConsumerTag, Subs),
|
||||
#subscription{dest_hdr = Destination} = dict:fetch(ConsumerTag, Subs),
|
||||
|
||||
send_frame(
|
||||
"MESSAGE",
|
||||
|
|
@ -349,7 +397,7 @@ shutdown_channel_and_connection(State = #state{channel = Channel,
|
|||
connection = Connection,
|
||||
subscriptions = Subs}) ->
|
||||
dict:fold(
|
||||
fun(_ConsumerTag, {_DestHdr, SubChannel}, Acc) ->
|
||||
fun(_ConsumerTag, #subscription{channel = SubChannel}, Acc) ->
|
||||
case SubChannel of
|
||||
Channel -> Acc;
|
||||
_ ->
|
||||
|
|
@ -362,6 +410,10 @@ shutdown_channel_and_connection(State = #state{channel = Channel,
|
|||
amqp_connection:close(Connection),
|
||||
State#state{channel = none, connection = none}.
|
||||
|
||||
default_prefetch({queue, _}) ->
|
||||
?DEFAULT_QUEUE_PREFETCH;
|
||||
default_prefetch(_) ->
|
||||
undefined.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
%% Transaction Support
|
||||
|
|
@ -416,7 +468,7 @@ commit_transaction(Transaction, State0) ->
|
|||
State,
|
||||
Actions),
|
||||
erase({transaction, Transaction}),
|
||||
ok(State)
|
||||
ok(FinalState)
|
||||
end).
|
||||
|
||||
abort_transaction(Transaction, State0) ->
|
||||
|
|
@ -435,6 +487,39 @@ perform_transaction_action({Channel, Method}, State) ->
|
|||
perform_transaction_action({Method, Props, BodyFragments}, State) ->
|
||||
send_method(Method, Props, BodyFragments, State).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Heartbeat Management
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
ensure_heartbeats(Heartbeats,
|
||||
State = #state{socket = Sock, start_heartbeat_fun = SHF}) ->
|
||||
[CX, CY] = [list_to_integer(X) ||
|
||||
X <- re:split(Heartbeats, ",", [{return, list}])],
|
||||
|
||||
SendFun = fun() ->
|
||||
catch gen_tcp:send(Sock, <<0>>)
|
||||
end,
|
||||
|
||||
Pid = self(),
|
||||
ReceiveFun = fun() ->
|
||||
gen_server:cast(Pid, client_timeout)
|
||||
end,
|
||||
|
||||
{SendTimeout, ReceiveTimeout} =
|
||||
{millis_to_seconds(CY), millis_to_seconds(CX)},
|
||||
|
||||
SHF(Sock, SendTimeout, SendFun, ReceiveTimeout, ReceiveFun),
|
||||
|
||||
{{SendTimeout * 1000 , ReceiveTimeout * 1000}, State}.
|
||||
|
||||
millis_to_seconds(M) when M =< 0 ->
|
||||
0;
|
||||
millis_to_seconds(M) ->
|
||||
case M < 1000 of
|
||||
true -> 1;
|
||||
false -> M div 1000
|
||||
end.
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
%% Queue and Binding Setup
|
||||
%%----------------------------------------------------------------------------
|
||||
|
|
@ -542,7 +627,9 @@ send_frame(Frame, State = #state{socket = Sock}) ->
|
|||
|
||||
send_error(Message, Detail, State) ->
|
||||
send_frame("ERROR", [{"message", Message},
|
||||
{"content-type", "text/plain"}], Detail, State).
|
||||
{"content-type", "text/plain"},
|
||||
{"version", string:join(?SUPPORTED_VERSIONS, ",")}],
|
||||
Detail, State).
|
||||
|
||||
%%----------------------------------------------------------------------------
|
||||
%% Skeleton gen_server callbacks
|
||||
|
|
|
|||
|
|
@ -75,11 +75,17 @@ process_received_bytes(Bytes,
|
|||
State = #reader_state{
|
||||
processor = Processor,
|
||||
parse_state = ParseState}) ->
|
||||
Resume =
|
||||
fun(RestBytes) ->
|
||||
PS = rabbit_stomp_frame:initial_state(),
|
||||
process_received_bytes(RestBytes,
|
||||
State#reader_state{parse_state = PS})
|
||||
end,
|
||||
|
||||
case rabbit_stomp_frame:parse(Bytes, ParseState) of
|
||||
{more, ParseState1} ->
|
||||
?MODULE:mainloop(State#reader_state{parse_state = ParseState1});
|
||||
{ok, Frame, Rest} ->
|
||||
rabbit_stomp_processor:process_frame(Processor, Frame),
|
||||
PS = rabbit_stomp_frame:initial_state(),
|
||||
process_received_bytes(Rest, State#reader_state{parse_state = PS})
|
||||
Resume(Rest)
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -62,7 +62,8 @@ make_listener_specs(Listeners) ->
|
|||
{tcp_listener_sup, start_link,
|
||||
[IPAddress, Port,
|
||||
[{packet, raw},
|
||||
{reuseaddr, true}],
|
||||
{reuseaddr, true},
|
||||
{backlog, 128}],
|
||||
{?MODULE, listener_started, []},
|
||||
{?MODULE, listener_stopped, []},
|
||||
{?MODULE, start_client, []}, "STOMP Listener"]},
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@
|
|||
parse_message_id/1]).
|
||||
-export([longstr_field/2]).
|
||||
-export([ack_mode/1, consumer_tag/1, message_headers/4, message_properties/1]).
|
||||
-export([negotiate_version/2]).
|
||||
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
-include("rabbit_stomp_frame.hrl").
|
||||
|
|
@ -44,6 +45,13 @@
|
|||
-define(EXCHANGE_PREFIX, "/exchange").
|
||||
|
||||
-define(MESSAGE_ID_SEPARATOR, "@@").
|
||||
-define(HEADER_CONTENT_TYPE, "content-type").
|
||||
-define(HEADER_CONTENT_ENCODING, "content-encoding").
|
||||
-define(HEADER_DELIVERY_MODE, "delivery-mode").
|
||||
-define(HEADER_PRIORITY, "priority").
|
||||
-define(HEADER_CORRELATION_ID, "correlation-id").
|
||||
-define(HEADER_REPLY_TO, "reply-to").
|
||||
-define(HEADER_AMQP_MESSAGE_ID, "amqp-message-id").
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Frame and Header Parsing
|
||||
|
|
@ -64,24 +72,25 @@ consumer_tag(Frame) ->
|
|||
|
||||
ack_mode(Frame) ->
|
||||
case rabbit_stomp_frame:header(Frame, "ack", "auto") of
|
||||
"auto" -> auto;
|
||||
"client" -> client
|
||||
"auto" -> {auto, false};
|
||||
"client" -> {client, true};
|
||||
"client-individual" -> {client, false}
|
||||
end.
|
||||
|
||||
message_properties(Frame = #stomp_frame{headers = Headers}) ->
|
||||
BinH = fun(K, V) -> rabbit_stomp_frame:binary_header(Frame, K, V) end,
|
||||
IntH = fun(K, V) -> rabbit_stomp_frame:integer_header(Frame, K, V) end,
|
||||
|
||||
|
||||
#'P_basic'{
|
||||
content_type = BinH("content-type", <<"text/plain">>),
|
||||
content_encoding = BinH("content-encoding", undefined),
|
||||
delivery_mode = IntH("delivery-mode", undefined),
|
||||
priority = IntH("priority", undefined),
|
||||
correlation_id = BinH("correlation-id", undefined),
|
||||
reply_to = BinH("reply-to", undefined),
|
||||
message_id = BinH("amqp-message-id", undefined),
|
||||
headers = [longstr_field(K, V) || {"X-" ++ K, V} <- Headers]}.
|
||||
content_type = BinH(?HEADER_CONTENT_TYPE, <<"text/plain">>),
|
||||
content_encoding = BinH(?HEADER_CONTENT_ENCODING, undefined),
|
||||
delivery_mode = IntH(?HEADER_DELIVERY_MODE, undefined),
|
||||
priority = IntH(?HEADER_PRIORITY, undefined),
|
||||
correlation_id = BinH(?HEADER_CORRELATION_ID, undefined),
|
||||
reply_to = BinH(?HEADER_REPLY_TO, undefined),
|
||||
message_id = BinH(?HEADER_AMQP_MESSAGE_ID, undefined),
|
||||
headers = [longstr_field(K, V) ||
|
||||
{K, V} <- Headers, user_header(K)]}.
|
||||
|
||||
message_headers(Destination, SessionId,
|
||||
#'basic.deliver'{consumer_tag = ConsumerTag,
|
||||
|
|
@ -101,8 +110,8 @@ message_headers(Destination, SessionId,
|
|||
{"message-id", create_message_id(ConsumerTag,
|
||||
SessionId,
|
||||
DeliveryTag)}]
|
||||
++ maybe_header("content-type", ContentType)
|
||||
++ maybe_header("content-encoding", ContentEncoding)
|
||||
++ maybe_header(?HEADER_CONTENT_TYPE, ContentType)
|
||||
++ maybe_header(?HEADER_CONTENT_ENCODING, ContentEncoding)
|
||||
++ case ConsumerTag of
|
||||
<<"Q_", _/binary>> -> [];
|
||||
<<"T_", Id/binary>> -> [{"subscription", binary_to_list(Id)}]
|
||||
|
|
@ -111,11 +120,24 @@ message_headers(Destination, SessionId,
|
|||
undefined -> [];
|
||||
_ -> Headers
|
||||
end)
|
||||
++ maybe_header("delivery-mode", DeliveryMode)
|
||||
++ maybe_header("priority", Priority)
|
||||
++ maybe_header("correlation-id", CorrelationId)
|
||||
++ maybe_header("reply-to", ReplyTo)
|
||||
++ maybe_header("amqp-message-id", MessageId).
|
||||
++ maybe_header(?HEADER_DELIVERY_MODE, DeliveryMode)
|
||||
++ maybe_header(?HEADER_PRIORITY, Priority)
|
||||
++ maybe_header(?HEADER_CORRELATION_ID, CorrelationId)
|
||||
++ maybe_header(?HEADER_REPLY_TO, ReplyTo)
|
||||
++ maybe_header(?HEADER_AMQP_MESSAGE_ID, MessageId).
|
||||
|
||||
user_header(Hdr)
|
||||
when Hdr =:= ?HEADER_CONTENT_TYPE orelse
|
||||
Hdr =:= ?HEADER_CONTENT_ENCODING orelse
|
||||
Hdr =:= ?HEADER_DELIVERY_MODE orelse
|
||||
Hdr =:= ?HEADER_PRIORITY orelse
|
||||
Hdr =:= ?HEADER_CORRELATION_ID orelse
|
||||
Hdr =:= ?HEADER_REPLY_TO orelse
|
||||
Hdr =:= ?HEADER_AMQP_MESSAGE_ID orelse
|
||||
Hdr =:= "destination" ->
|
||||
false;
|
||||
user_header(_) ->
|
||||
true.
|
||||
|
||||
parse_message_id(MessageId) ->
|
||||
{ok, Pieces} = regexp:split(MessageId, ?MESSAGE_ID_SEPARATOR),
|
||||
|
|
@ -128,6 +150,37 @@ parse_message_id(MessageId) ->
|
|||
{error, invalid_message_id}
|
||||
end.
|
||||
|
||||
negotiate_version(ClientVers, ServerVers) ->
|
||||
Common = lists:filter(fun(Ver) ->
|
||||
lists:member(Ver, ServerVers)
|
||||
end, ClientVers),
|
||||
case Common of
|
||||
[] ->
|
||||
{error, no_common_version};
|
||||
[H|T] ->
|
||||
{ok, lists:foldl(fun(Ver, AccN) ->
|
||||
max_version(Ver, AccN)
|
||||
end, H, T)}
|
||||
end.
|
||||
|
||||
max_version(V, V) ->
|
||||
V;
|
||||
max_version(V1, V2) ->
|
||||
Split = fun(X) -> re:split(X, "\\.", [{return, list}]) end,
|
||||
find_max_version({V1, Split(V1)}, {V2, Split(V2)}).
|
||||
|
||||
find_max_version({V1, [X|T1]}, {V2, [X|T2]}) ->
|
||||
find_max_version({V1, T1}, {V2, T2});
|
||||
find_max_version({V1, [X]}, {V2, [Y]}) ->
|
||||
case list_to_integer(X) >= list_to_integer(Y) of
|
||||
true -> V1;
|
||||
false -> V2
|
||||
end;
|
||||
find_max_version({_V1, []}, {V2, Y}) when length(Y) > 0 ->
|
||||
V2;
|
||||
find_max_version({V1, X}, {_V2, []}) when length(X) > 0 ->
|
||||
V1.
|
||||
|
||||
%% ---- Header processing helpers ----
|
||||
|
||||
longstr_field(K, V) ->
|
||||
|
|
@ -144,9 +197,9 @@ maybe_header(_Key, _Value) ->
|
|||
|
||||
adhoc_convert_headers(Headers) ->
|
||||
lists:foldr(fun ({K, longstr, V}, Acc) ->
|
||||
[{"X-" ++ binary_to_list(K), binary_to_list(V)} | Acc];
|
||||
[{binary_to_list(K), binary_to_list(V)} | Acc];
|
||||
({K, signedint, V}, Acc) ->
|
||||
[{"X-" ++ binary_to_list(K), integer_to_list(V)} | Acc];
|
||||
[{binary_to_list(K), integer_to_list(V)} | Acc];
|
||||
(_, Acc) ->
|
||||
Acc
|
||||
end, [], Headers).
|
||||
|
|
|
|||
|
|
@ -9,11 +9,13 @@ class TestAck(base.BaseTest):
|
|||
d = "/queue/ack-test"
|
||||
|
||||
# subscribe and send message
|
||||
self.listener.reset()
|
||||
self.conn.subscribe(destination=d, ack='client')
|
||||
self.conn.send("test", destination=d)
|
||||
self.assertTrue(self.listener.await(3), "initial message not received")
|
||||
self.assertEquals(1, len(self.listener.messages))
|
||||
self.listener.reset(2) ## expecting 2 messages
|
||||
self.conn.subscribe(destination=d, ack='client',
|
||||
headers={'prefetch-count': '10'})
|
||||
self.conn.send("test1", destination=d)
|
||||
self.conn.send("test2", destination=d)
|
||||
self.assertTrue(self.listener.await(4), "initial message not received")
|
||||
self.assertEquals(2, len(self.listener.messages))
|
||||
|
||||
# disconnect with no ack
|
||||
self.conn.disconnect()
|
||||
|
|
@ -22,13 +24,15 @@ class TestAck(base.BaseTest):
|
|||
conn2 = self.create_connection()
|
||||
try:
|
||||
listener2 = base.WaitableListener()
|
||||
listener2.reset(2)
|
||||
conn2.set_listener('', listener2)
|
||||
conn2.subscribe(destination=d, ack='client')
|
||||
conn2.subscribe(destination=d, ack='client',
|
||||
headers={'prefetch-count': '10'})
|
||||
self.assertTrue(listener2.await(), "message not received again")
|
||||
self.assertEquals(1, len(listener2.messages))
|
||||
self.assertEquals(2, len(listener2.messages))
|
||||
|
||||
# now ack
|
||||
mid = listener2.messages[0]['headers']['message-id']
|
||||
# now ack only the last message - expecting cumulative behaviour
|
||||
mid = listener2.messages[1]['headers']['message-id']
|
||||
conn2.ack({'message-id':mid})
|
||||
finally:
|
||||
conn2.stop()
|
||||
|
|
@ -44,6 +48,50 @@ class TestAck(base.BaseTest):
|
|||
finally:
|
||||
conn3.stop()
|
||||
|
||||
def test_ack_client_individual(self):
|
||||
d = "/queue/ack-test-individual"
|
||||
|
||||
# subscribe and send message
|
||||
self.listener.reset(2) ## expecting 2 messages
|
||||
self.conn.subscribe(destination=d, ack='client-individual',
|
||||
headers={'prefetch-count': '10'})
|
||||
self.conn.send("test1", destination=d)
|
||||
self.conn.send("test2", destination=d)
|
||||
self.assertTrue(self.listener.await(4), "initial message not received")
|
||||
self.assertEquals(2, len(self.listener.messages))
|
||||
|
||||
# disconnect with no ack
|
||||
self.conn.disconnect()
|
||||
|
||||
# now reconnect
|
||||
conn2 = self.create_connection()
|
||||
try:
|
||||
listener2 = base.WaitableListener()
|
||||
listener2.reset(2)
|
||||
conn2.set_listener('', listener2)
|
||||
conn2.subscribe(destination=d, ack='client-individual',
|
||||
headers={'prefetch-count': '10'})
|
||||
self.assertTrue(listener2.await(), "message not received again")
|
||||
self.assertEquals(2, len(listener2.messages))
|
||||
|
||||
# now ack only the last message - expecting individual behaviour
|
||||
mid = listener2.messages[1]['headers']['message-id']
|
||||
conn2.ack({'message-id':mid})
|
||||
finally:
|
||||
conn2.stop()
|
||||
|
||||
# now reconnect again, shouldn't see the message
|
||||
conn3 = self.create_connection()
|
||||
try:
|
||||
listener3 = base.WaitableListener()
|
||||
conn3.set_listener('', listener3)
|
||||
conn3.subscribe(destination=d)
|
||||
self.assertTrue(listener3.await(3),
|
||||
"Expected to see a message. ACK not working?")
|
||||
self.assertEquals("test1", listener3.messages[0]['message'])
|
||||
finally:
|
||||
conn3.stop()
|
||||
|
||||
def test_ack_client_tx(self):
|
||||
d = "/queue/ack-test-tx"
|
||||
|
||||
|
|
@ -88,3 +136,17 @@ class TestAck(base.BaseTest):
|
|||
finally:
|
||||
conn3.stop()
|
||||
|
||||
def test_topic_prefetch(self):
|
||||
d = "/topic/prefetch-test"
|
||||
|
||||
# subscribe and send message
|
||||
self.listener.reset(6) ## expecting 6 messages
|
||||
self.conn.subscribe(destination=d, ack='client',
|
||||
headers={'prefetch-count': '5'})
|
||||
|
||||
for x in range(10):
|
||||
self.conn.send("test" + str(x), destination=d)
|
||||
|
||||
self.assertFalse(self.listener.await(5),
|
||||
"Should not have been able to see 6 messages")
|
||||
self.assertEquals(5, len(self.listener.messages))
|
||||
|
|
|
|||
|
|
@ -6,8 +6,9 @@ import threading
|
|||
|
||||
class BaseTest(unittest.TestCase):
|
||||
|
||||
def create_connection(self):
|
||||
conn = stomp.Connection(user="guest", passcode="guest")
|
||||
def create_connection(self, version=None, heartbeat=None):
|
||||
conn = stomp.Connection(user="guest", passcode="guest",
|
||||
version=version, heartbeat=heartbeat)
|
||||
conn.start()
|
||||
conn.connect()
|
||||
return conn
|
||||
|
|
@ -96,8 +97,11 @@ class Latch(object):
|
|||
def await(self, timeout=None):
|
||||
try:
|
||||
self.cond.acquire()
|
||||
self.cond.wait(timeout)
|
||||
return self.count == 0
|
||||
if self.count == 0:
|
||||
return True
|
||||
else:
|
||||
self.cond.wait(timeout)
|
||||
return self.count == 0
|
||||
finally:
|
||||
self.cond.release()
|
||||
|
||||
|
|
|
|||
|
|
@ -40,6 +40,46 @@ class TestLifecycle(base.BaseTest):
|
|||
self.assertFalse(self.listener.await(3),
|
||||
"UNSUBSCRIBE failed, still receiving messages")
|
||||
|
||||
def test_connect_version_1_1(self):
|
||||
self.conn.disconnect()
|
||||
new_conn = self.create_connection(version="1.1,1.0")
|
||||
try:
|
||||
self.assertTrue(new_conn.is_connected())
|
||||
finally:
|
||||
new_conn.disconnect()
|
||||
|
||||
def test_heartbeat_disconnects_client(self):
|
||||
self.conn.disconnect()
|
||||
new_conn = self.create_connection(heartbeat="1500,0")
|
||||
try:
|
||||
self.assertTrue(new_conn.is_connected())
|
||||
time.sleep(1)
|
||||
self.assertTrue(new_conn.is_connected())
|
||||
time.sleep(3)
|
||||
self.assertFalse(new_conn.is_connected())
|
||||
finally:
|
||||
if new_conn.is_connected():
|
||||
new_conn.disconnect()
|
||||
|
||||
|
||||
|
||||
def test_unsupported_version(self):
|
||||
self.conn.disconnect()
|
||||
new_conn = stomp.Connection(user="guest",
|
||||
passcode="guest",
|
||||
version="100.1")
|
||||
listener = base.WaitableListener()
|
||||
new_conn.set_listener('', listener)
|
||||
try:
|
||||
new_conn.start()
|
||||
new_conn.connect()
|
||||
self.assertTrue(listener.await())
|
||||
self.assertEquals("Supported versions are 1.0,1.1\n",
|
||||
listener.errors[0]['message'])
|
||||
finally:
|
||||
if new_conn.is_connected():
|
||||
new_conn.disconnect()
|
||||
|
||||
def test_disconnect(self):
|
||||
''' Run DISCONNECT command '''
|
||||
self.conn.disconnect()
|
||||
|
|
|
|||
|
|
@ -18,6 +18,8 @@ def connect(cnames):
|
|||
'\n\0')
|
||||
resp = ('CONNECTED\n'
|
||||
'session:(.*)\n'
|
||||
'heartbeat:0,0\n'
|
||||
'version:1.0\n'
|
||||
'\n\x00')
|
||||
def w(m):
|
||||
@functools.wraps(m)
|
||||
|
|
@ -103,6 +105,7 @@ class TestParsing(unittest.TestCase):
|
|||
resp = ('ERROR\n'
|
||||
'message:Bad command\n'
|
||||
'content-type:text/plain\n'
|
||||
'version:1.0,1.1\n'
|
||||
'content-length:41\n'
|
||||
'\n'
|
||||
'Could not interpret command WRONGCOMMAND\n'
|
||||
|
|
|
|||
|
|
@ -41,8 +41,8 @@ message_properties_test() ->
|
|||
{"correlation-id", "123"},
|
||||
{"reply-to", "something"},
|
||||
{"amqp-message-id", "M123"},
|
||||
{"X-str", "foo"},
|
||||
{"X-int", "123"}
|
||||
{"str", "foo"},
|
||||
{"int", "123"}
|
||||
],
|
||||
|
||||
#'P_basic'{
|
||||
|
|
@ -91,27 +91,62 @@ message_headers_test() ->
|
|||
{"correlation-id", "123"},
|
||||
{"reply-to", "something"},
|
||||
{"amqp-message-id", "M123"},
|
||||
{"X-str", "foo"},
|
||||
{"X-int", "123"}
|
||||
{"str", "foo"},
|
||||
{"int", "123"}
|
||||
],
|
||||
|
||||
[] = lists:subtract(Headers, Expected).
|
||||
|
||||
negotiate_version_both_empty_test() ->
|
||||
{error, no_common_version} = rabbit_stomp_util:negotiate_version([],[]).
|
||||
|
||||
negotiate_version_no_common_test() ->
|
||||
{error, no_common_version} =
|
||||
rabbit_stomp_util:negotiate_version(["1.2"],["1.3"]).
|
||||
|
||||
negotiate_version_simple_common_test() ->
|
||||
{ok, "1.2"} =
|
||||
rabbit_stomp_util:negotiate_version(["1.2"],["1.2"]).
|
||||
|
||||
negotiate_version_two_choice_common_test() ->
|
||||
{ok, "1.3"} =
|
||||
rabbit_stomp_util:negotiate_version(["1.2", "1.3"],["1.2", "1.3"]).
|
||||
|
||||
negotiate_version_two_choice_common_out_of_order_test() ->
|
||||
{ok, "1.3"} =
|
||||
rabbit_stomp_util:negotiate_version(["1.3", "1.2"],["1.2", "1.3"]).
|
||||
|
||||
negotiate_version_two_choice_big_common_test() ->
|
||||
{ok, "1.20.23"} =
|
||||
rabbit_stomp_util:negotiate_version(["1.20.23", "1.30.456"],
|
||||
["1.20.23", "1.30.457"]).
|
||||
negotiate_version_choice_mismatched_length_test() ->
|
||||
{ok, "1.2.3"} =
|
||||
rabbit_stomp_util:negotiate_version(["1.2", "1.2.3"],
|
||||
["1.2.3", "1.2"]).
|
||||
negotiate_version_choice_duplicates_test() ->
|
||||
{ok, "1.2"} =
|
||||
rabbit_stomp_util:negotiate_version(["1.2", "1.2"],
|
||||
["1.2", "1.2"]).
|
||||
%%--------------------------------------------------------------------
|
||||
%% Frame Parsing Tests
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
ack_mode_auto_test() ->
|
||||
Frame = #stomp_frame{headers = [{"ack", "auto"}]},
|
||||
auto = rabbit_stomp_util:ack_mode(Frame).
|
||||
{auto, _} = rabbit_stomp_util:ack_mode(Frame).
|
||||
|
||||
ack_mode_auto_default_test() ->
|
||||
Frame = #stomp_frame{headers = []},
|
||||
auto = rabbit_stomp_util:ack_mode(Frame).
|
||||
{auto, _} = rabbit_stomp_util:ack_mode(Frame).
|
||||
|
||||
ack_mode_client_test() ->
|
||||
Frame = #stomp_frame{headers = [{"ack", "client"}]},
|
||||
client = rabbit_stomp_util:ack_mode(Frame).
|
||||
{client, true} = rabbit_stomp_util:ack_mode(Frame).
|
||||
|
||||
ack_mode_client_individual_test() ->
|
||||
Frame = #stomp_frame{headers = [{"ack", "client-individual"}]},
|
||||
{client, false} = rabbit_stomp_util:ack_mode(Frame).
|
||||
|
||||
consumer_tag_id_test() ->
|
||||
Frame = #stomp_frame{headers = [{"id", "foo"}]},
|
||||
|
|
|
|||
Loading…
Reference in New Issue