2010-10-28 05:14:19 +08:00
|
|
|
%% The contents of this file are subject to the Mozilla Public License
|
|
|
|
|
%% Version 1.1 (the "License"); you may not use this file except in
|
|
|
|
|
%% compliance with the License. You may obtain a copy of the License at
|
|
|
|
|
%% http://www.mozilla.org/MPL/
|
|
|
|
|
%%
|
|
|
|
|
%% Software distributed under the License is distributed on an "AS IS"
|
|
|
|
|
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
|
|
|
|
|
%% License for the specific language governing rights and limitations
|
|
|
|
|
%% under the License.
|
|
|
|
|
%%
|
|
|
|
|
%% The Original Code is RabbitMQ.
|
|
|
|
|
%%
|
|
|
|
|
%% The Initial Developers of the Original Code are LShift Ltd,
|
|
|
|
|
%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
|
|
|
|
|
%%
|
|
|
|
|
%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
|
|
|
|
|
%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
|
|
|
|
|
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
|
|
|
|
|
%% Technologies LLC, and Rabbit Technologies Ltd.
|
|
|
|
|
%%
|
|
|
|
|
%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
|
|
|
|
|
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
|
|
|
|
|
%% Copyright (C) 2007-2009 Cohesive Financial Technologies
|
|
|
|
|
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
|
|
|
|
|
%% (C) 2007-2009 Rabbit Technologies Ltd.
|
|
|
|
|
%%
|
|
|
|
|
%% All Rights Reserved.
|
|
|
|
|
%%
|
|
|
|
|
%% Contributor(s): ______________________________________.
|
|
|
|
|
%%
|
|
|
|
|
-module(rabbit_stomp_processor).
|
2011-01-31 21:00:15 +08:00
|
|
|
-behaviour(gen_server2).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-02-10 00:35:03 +08:00
|
|
|
-export([start_link/2, process_frame/2]).
|
2010-10-28 05:14:19 +08:00
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
|
|
|
code_change/3, terminate/2]).
|
|
|
|
|
|
|
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
|
|
|
-include("rabbit_stomp_frame.hrl").
|
|
|
|
|
|
2010-11-10 03:10:56 +08:00
|
|
|
-record(state, {socket, session_id, channel,
|
|
|
|
|
connection, subscriptions, version,
|
2011-02-15 23:01:50 +08:00
|
|
|
start_heartbeat_fun, confirm_enabled}).
|
2010-11-09 19:57:20 +08:00
|
|
|
|
2011-01-19 00:24:42 +08:00
|
|
|
-record(subscription, {dest_hdr, channel, multi_ack, description}).
|
2010-11-09 22:12:59 +08:00
|
|
|
|
2010-11-09 19:57:20 +08:00
|
|
|
-define(SUPPORTED_VERSIONS, ["1.0", "1.1"]).
|
2010-11-09 22:12:59 +08:00
|
|
|
-define(DEFAULT_QUEUE_PREFETCH, 1).
|
2011-02-01 19:30:50 +08:00
|
|
|
-define(FLUSH_TIMEOUT, 60000).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Public API
|
|
|
|
|
%%----------------------------------------------------------------------------
|
2010-11-11 05:09:20 +08:00
|
|
|
start_link(Sock, StartHeartbeatFun) ->
|
2011-01-31 21:00:15 +08:00
|
|
|
gen_server2:start_link(?MODULE, [Sock, StartHeartbeatFun], []).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
process_frame(Pid, Frame = #stomp_frame{command = Command}) ->
|
2011-01-31 21:00:15 +08:00
|
|
|
gen_server2:cast(Pid, {Command, Frame}).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
%%----------------------------------------------------------------------------
|
2011-01-31 21:00:15 +08:00
|
|
|
%% Basic gen_server2 callbacks
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
2010-11-11 05:09:20 +08:00
|
|
|
init([Sock, StartHeartbeatFun]) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
process_flag(trap_exit, true),
|
|
|
|
|
{ok,
|
|
|
|
|
#state {
|
2010-11-11 05:09:20 +08:00
|
|
|
socket = Sock,
|
|
|
|
|
session_id = none,
|
|
|
|
|
channel = none,
|
|
|
|
|
connection = none,
|
|
|
|
|
subscriptions = dict:new(),
|
|
|
|
|
version = none,
|
2011-02-15 23:01:50 +08:00
|
|
|
start_heartbeat_fun = StartHeartbeatFun,
|
|
|
|
|
confirm_enabled = false},
|
2011-01-31 21:56:52 +08:00
|
|
|
hibernate,
|
|
|
|
|
{backoff, 1000, 1000, 10000}
|
2010-10-28 05:14:19 +08:00
|
|
|
}.
|
|
|
|
|
|
2011-01-20 22:22:25 +08:00
|
|
|
terminate(_Reason, State) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
shutdown_channel_and_connection(State).
|
|
|
|
|
|
2011-01-11 22:12:58 +08:00
|
|
|
handle_cast({"STOMP", Frame}, State) ->
|
|
|
|
|
handle_cast({"CONNECT", Frame}, State);
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
handle_cast({"CONNECT", Frame}, State = #state{channel = none}) ->
|
2011-01-11 04:27:32 +08:00
|
|
|
process_request(
|
|
|
|
|
fun(StateN) ->
|
|
|
|
|
case negotiate_version(Frame) of
|
|
|
|
|
{ok, Version} ->
|
|
|
|
|
{ok, DefaultVHost} =
|
|
|
|
|
application:get_env(rabbit, default_vhost),
|
2011-01-11 02:43:41 +08:00
|
|
|
do_login(rabbit_stomp_frame:header(Frame, "login"),
|
|
|
|
|
rabbit_stomp_frame:header(Frame, "passcode"),
|
|
|
|
|
rabbit_stomp_frame:header(Frame, "host",
|
|
|
|
|
binary_to_list(
|
|
|
|
|
DefaultVHost)),
|
|
|
|
|
rabbit_stomp_frame:header(Frame, "heartbeat",
|
|
|
|
|
"0,0"),
|
|
|
|
|
Version,
|
2011-01-11 04:27:32 +08:00
|
|
|
StateN);
|
|
|
|
|
{error, no_common_version} ->
|
|
|
|
|
error("Version mismatch",
|
|
|
|
|
"Supported versions are ~s\n",
|
|
|
|
|
[string:join(?SUPPORTED_VERSIONS, ",")],
|
|
|
|
|
StateN)
|
|
|
|
|
end
|
|
|
|
|
end,
|
|
|
|
|
fun(StateM) -> StateM end,
|
|
|
|
|
State);
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
handle_cast(_Request, State = #state{channel = none}) ->
|
2011-02-01 19:30:50 +08:00
|
|
|
{noreply,
|
|
|
|
|
send_error("Illegal command",
|
|
|
|
|
"You must log in using CONNECT first\n",
|
|
|
|
|
State),
|
|
|
|
|
hibernate};
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
handle_cast({Command, Frame}, State) ->
|
2010-11-26 19:10:43 +08:00
|
|
|
process_request(
|
|
|
|
|
fun(StateN) ->
|
|
|
|
|
handle_frame(Command, Frame, StateN)
|
|
|
|
|
end,
|
|
|
|
|
fun(StateM) ->
|
|
|
|
|
ensure_receipt(Frame, StateM)
|
|
|
|
|
end,
|
2011-01-11 02:43:41 +08:00
|
|
|
State);
|
2010-11-10 03:10:56 +08:00
|
|
|
|
2010-11-11 05:09:20 +08:00
|
|
|
handle_cast(client_timeout, State) ->
|
|
|
|
|
{stop, client_timeout, State}.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
handle_info(#'basic.consume_ok'{}, State) ->
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, State, hibernate};
|
2011-01-15 01:09:50 +08:00
|
|
|
handle_info(#'basic.cancel_ok'{}, State) ->
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, State, hibernate};
|
2010-10-28 05:14:19 +08:00
|
|
|
handle_info({Delivery = #'basic.deliver'{},
|
|
|
|
|
#amqp_msg{props = Props, payload = Payload}}, State) ->
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, send_delivery(Delivery, Props, Payload, State), hibernate};
|
2011-01-27 01:46:27 +08:00
|
|
|
handle_info({inet_reply, _, ok}, State) ->
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, State, hibernate};
|
2011-01-27 01:46:27 +08:00
|
|
|
handle_info({inet_reply, _, Status}, State) ->
|
2011-01-31 21:01:34 +08:00
|
|
|
{stop, Status, State}.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2010-11-26 19:10:43 +08:00
|
|
|
process_request(ProcessFun, SuccessFun, State) ->
|
|
|
|
|
Res = case catch ProcessFun(State) of
|
|
|
|
|
{'EXIT',
|
|
|
|
|
{{server_initiated_close, ReplyCode, Explanation}, _}} ->
|
2011-01-17 18:38:43 +08:00
|
|
|
amqp_death(ReplyCode, Explanation, State);
|
2010-11-26 19:10:43 +08:00
|
|
|
{'EXIT', Reason} ->
|
|
|
|
|
priv_error("Processing error", "Processing error\n",
|
|
|
|
|
Reason, State);
|
|
|
|
|
Result ->
|
|
|
|
|
Result
|
|
|
|
|
end,
|
|
|
|
|
case Res of
|
|
|
|
|
{ok, Frame, NewState} ->
|
|
|
|
|
case Frame of
|
|
|
|
|
none -> ok;
|
|
|
|
|
_ -> send_frame(Frame, NewState)
|
|
|
|
|
end,
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, SuccessFun(NewState), hibernate};
|
2010-11-26 19:10:43 +08:00
|
|
|
{error, Message, Detail, NewState} ->
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, send_error(Message, Detail, NewState), hibernate};
|
2010-11-26 19:10:43 +08:00
|
|
|
{stop, R, State} ->
|
|
|
|
|
{stop, R, State}
|
|
|
|
|
end.
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Frame handlers
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
handle_frame("DISCONNECT", _Frame, State) ->
|
|
|
|
|
%% We'll get to shutdown the channels in terminate
|
|
|
|
|
{stop, normal, State};
|
|
|
|
|
|
|
|
|
|
handle_frame("SUBSCRIBE", Frame, State) ->
|
|
|
|
|
with_destination("SUBSCRIBE", Frame, State, fun do_subscribe/4);
|
|
|
|
|
|
2011-01-06 23:54:05 +08:00
|
|
|
handle_frame("UNSUBSCRIBE", Frame, State) ->
|
2011-01-18 20:35:53 +08:00
|
|
|
ConsumerTag = rabbit_stomp_util:consumer_tag(Frame),
|
2011-01-14 18:49:38 +08:00
|
|
|
cancel_subscription(ConsumerTag, State);
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
handle_frame("SEND", Frame, State) ->
|
2011-02-15 23:01:50 +08:00
|
|
|
with_destination("SEND", Frame, ensure_confirm(Frame, State),
|
|
|
|
|
fun do_send/4);
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-01-11 19:25:44 +08:00
|
|
|
handle_frame("ACK", Frame, State) ->
|
2011-01-11 21:57:59 +08:00
|
|
|
ack_action("ACK", Frame, State, fun create_ack_method/2);
|
2011-01-11 19:25:44 +08:00
|
|
|
|
|
|
|
|
handle_frame("NACK", Frame, State) ->
|
2011-01-11 21:57:59 +08:00
|
|
|
ack_action("NACK", Frame, State, fun create_nack_method/2);
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
handle_frame("BEGIN", Frame, State) ->
|
|
|
|
|
transactional_action(Frame, "BEGIN", fun begin_transaction/2, State);
|
|
|
|
|
|
|
|
|
|
handle_frame("COMMIT", Frame, State) ->
|
|
|
|
|
transactional_action(Frame, "COMMIT", fun commit_transaction/2, State);
|
|
|
|
|
|
|
|
|
|
handle_frame("ABORT", Frame, State) ->
|
|
|
|
|
transactional_action(Frame, "ABORT", fun abort_transaction/2, State);
|
|
|
|
|
|
|
|
|
|
handle_frame(Command, _Frame, State) ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Bad command",
|
|
|
|
|
"Could not interpret command " ++ Command ++ "\n",
|
|
|
|
|
State).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Internal helpers for processing frames callbacks
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
2011-01-11 21:57:59 +08:00
|
|
|
ack_action(Command, Frame,
|
|
|
|
|
State = #state{subscriptions = Subs}, MethodFun) ->
|
2011-01-11 19:25:44 +08:00
|
|
|
case rabbit_stomp_frame:header(Frame, "message-id") of
|
|
|
|
|
{ok, IdStr} ->
|
|
|
|
|
case rabbit_stomp_util:parse_message_id(IdStr) of
|
2011-01-11 21:57:59 +08:00
|
|
|
{ok, {ConsumerTag, _SessionId, DeliveryTag}} ->
|
|
|
|
|
Subscription = #subscription{channel = SubChannel}
|
|
|
|
|
= dict:fetch(ConsumerTag, Subs),
|
|
|
|
|
Method = MethodFun(DeliveryTag, Subscription),
|
|
|
|
|
case transactional(Frame) of
|
|
|
|
|
{yes, Transaction} ->
|
|
|
|
|
extend_transaction(Transaction,
|
|
|
|
|
{SubChannel, Method},
|
|
|
|
|
State);
|
|
|
|
|
no ->
|
|
|
|
|
amqp_channel:call(SubChannel, Method),
|
|
|
|
|
ok(State)
|
|
|
|
|
end;
|
2011-01-11 19:25:44 +08:00
|
|
|
_ ->
|
|
|
|
|
error("Invalid message-id",
|
|
|
|
|
"~p must include a valid 'message-id' header\n",
|
|
|
|
|
[Command],
|
|
|
|
|
State)
|
|
|
|
|
end;
|
|
|
|
|
not_found ->
|
|
|
|
|
error("Missing message-id",
|
|
|
|
|
"~p must include a 'message-id' header\n",
|
|
|
|
|
[Command],
|
|
|
|
|
State)
|
|
|
|
|
end.
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Internal helpers for processing frames callbacks
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
2011-01-18 20:35:53 +08:00
|
|
|
cancel_subscription({error, _}, State) ->
|
2011-01-06 23:54:05 +08:00
|
|
|
error("Missing destination or id",
|
|
|
|
|
"UNSUBSCRIBE must include a 'destination' or 'id' header\n",
|
|
|
|
|
State);
|
|
|
|
|
|
2011-01-20 17:37:57 +08:00
|
|
|
cancel_subscription({ok, ConsumerTag, Description},
|
|
|
|
|
State = #state{channel = MainChannel,
|
|
|
|
|
subscriptions = Subs}) ->
|
2011-01-06 23:54:05 +08:00
|
|
|
case dict:find(ConsumerTag, Subs) of
|
2011-01-18 18:23:36 +08:00
|
|
|
error ->
|
2011-01-06 23:54:05 +08:00
|
|
|
error("No subscription found",
|
2011-01-19 00:24:42 +08:00
|
|
|
"UNSUBSCRIBE must refer to an existing subscription.\n"
|
|
|
|
|
"Subscription to ~p not found.\n",
|
|
|
|
|
[Description],
|
2011-01-06 23:54:05 +08:00
|
|
|
State);
|
2011-01-18 23:23:07 +08:00
|
|
|
{ok, #subscription{channel = SubChannel}} ->
|
2011-01-10 21:35:43 +08:00
|
|
|
case amqp_channel:call(SubChannel,
|
2011-01-20 17:37:57 +08:00
|
|
|
#'basic.cancel'{
|
|
|
|
|
consumer_tag = ConsumerTag}) of
|
2011-01-07 02:17:38 +08:00
|
|
|
#'basic.cancel_ok'{consumer_tag = ConsumerTag} ->
|
2011-01-20 17:37:57 +08:00
|
|
|
NewSubs = dict:erase(ConsumerTag, Subs),
|
|
|
|
|
ensure_subchannel_closed(SubChannel,
|
|
|
|
|
MainChannel,
|
|
|
|
|
State#state{
|
|
|
|
|
subscriptions = NewSubs});
|
2011-01-07 02:17:38 +08:00
|
|
|
_ ->
|
|
|
|
|
error("Failed to cancel subscription",
|
2011-01-19 00:24:42 +08:00
|
|
|
"UNSUBSCRIBE to ~p failed.\n",
|
2011-01-20 17:37:57 +08:00
|
|
|
[Description],
|
2011-01-07 02:17:38 +08:00
|
|
|
State)
|
|
|
|
|
end
|
2011-01-06 23:54:05 +08:00
|
|
|
end.
|
|
|
|
|
|
2011-01-20 17:37:57 +08:00
|
|
|
ensure_subchannel_closed(SubChannel, MainChannel, State)
|
|
|
|
|
when SubChannel == MainChannel ->
|
2011-01-18 23:23:07 +08:00
|
|
|
ok(State);
|
2011-01-10 21:35:43 +08:00
|
|
|
|
2011-01-18 23:23:07 +08:00
|
|
|
ensure_subchannel_closed(SubChannel, _MainChannel, State) ->
|
2011-01-10 21:35:43 +08:00
|
|
|
amqp_channel:close(SubChannel),
|
2011-01-18 23:23:07 +08:00
|
|
|
ok(State).
|
2011-01-10 21:35:43 +08:00
|
|
|
|
2011-02-15 23:01:50 +08:00
|
|
|
ensure_confirm(_Frame, State = #state{confirm_enabled = true}) ->
|
|
|
|
|
State;
|
|
|
|
|
ensure_confirm(Frame, State = #state{channel = Channel}) ->
|
|
|
|
|
case rabbit_stomp_frame:header(Frame, "receipt") of
|
|
|
|
|
{ok, _} -> amqp_channel:cast(#'confirm.select'{}, Channel),
|
|
|
|
|
State#state{confirm_enabled = true};
|
|
|
|
|
not_found -> State
|
|
|
|
|
end.
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
with_destination(Command, Frame, State, Fun) ->
|
|
|
|
|
case rabbit_stomp_frame:header(Frame, "destination") of
|
|
|
|
|
{ok, DestHdr} ->
|
|
|
|
|
case rabbit_stomp_util:parse_destination(DestHdr) of
|
|
|
|
|
{ok, Destination} ->
|
|
|
|
|
Fun(Destination, DestHdr, Frame, State);
|
|
|
|
|
{error, {invalid_destination, Type, Content}} ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Invalid destination",
|
|
|
|
|
"'~s' is not a valid ~p destination\n",
|
|
|
|
|
[Content, Type],
|
|
|
|
|
State);
|
2010-10-28 05:14:19 +08:00
|
|
|
{error, {unknown_destination, Content}} ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Unknown destination",
|
|
|
|
|
"'~s' is not a valid destination.\n" ++
|
|
|
|
|
"Valid destination types are: " ++
|
|
|
|
|
"/exchange, /topic or /queue.\n",
|
|
|
|
|
[Content],
|
|
|
|
|
State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end;
|
|
|
|
|
not_found ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Missing destination",
|
|
|
|
|
"~p must include a 'destination' header\n",
|
|
|
|
|
[Command],
|
|
|
|
|
State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end.
|
|
|
|
|
|
2010-11-12 15:23:02 +08:00
|
|
|
do_login({ok, Login}, {ok, Passcode}, VirtualHost, Heartbeat, Version, State) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
{ok, Connection} = amqp_connection:start(
|
|
|
|
|
direct, #amqp_params{
|
|
|
|
|
username = list_to_binary(Login),
|
|
|
|
|
password = list_to_binary(Passcode),
|
|
|
|
|
virtual_host = list_to_binary(VirtualHost)}),
|
|
|
|
|
{ok, Channel} = amqp_connection:open_channel(Connection),
|
|
|
|
|
SessionId = rabbit_guid:string_guid("session"),
|
2011-01-11 02:43:41 +08:00
|
|
|
|
2011-01-18 20:21:57 +08:00
|
|
|
{{SendTimeout, ReceiveTimeout}, State1} =
|
|
|
|
|
ensure_heartbeats(Heartbeat, State),
|
2011-01-11 02:43:41 +08:00
|
|
|
ok("CONNECTED",
|
|
|
|
|
[{"session", SessionId},
|
2011-01-18 20:21:57 +08:00
|
|
|
{"heartbeat", io_lib:format("~B,~B", [SendTimeout, ReceiveTimeout])},
|
2011-01-11 02:43:41 +08:00
|
|
|
{"version", Version}],
|
|
|
|
|
"",
|
|
|
|
|
State1#state{session_id = SessionId,
|
2010-11-26 19:10:43 +08:00
|
|
|
channel = Channel,
|
|
|
|
|
connection = Connection});
|
2011-01-11 02:43:41 +08:00
|
|
|
|
2010-11-12 15:23:02 +08:00
|
|
|
do_login(_, _, _, _, _, State) ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Bad CONNECT", "Missing login or passcode header(s)\n", State).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
do_subscribe(Destination, DestHdr, Frame,
|
|
|
|
|
State = #state{subscriptions = Subs,
|
|
|
|
|
connection = Connection,
|
|
|
|
|
channel = MainChannel}) ->
|
2010-11-09 22:12:59 +08:00
|
|
|
Prefetch = rabbit_stomp_frame:integer_header(Frame, "prefetch-count",
|
|
|
|
|
default_prefetch(Destination)),
|
|
|
|
|
|
|
|
|
|
Channel = case Prefetch of
|
|
|
|
|
undefined ->
|
|
|
|
|
MainChannel;
|
|
|
|
|
_ ->
|
2010-10-28 05:14:19 +08:00
|
|
|
{ok, Channel1} = amqp_connection:open_channel(Connection),
|
|
|
|
|
amqp_channel:call(Channel1,
|
|
|
|
|
#'basic.qos'{prefetch_size = 0,
|
2010-11-09 22:12:59 +08:00
|
|
|
prefetch_count = Prefetch,
|
2010-10-28 05:14:19 +08:00
|
|
|
global = false}),
|
2010-11-09 22:12:59 +08:00
|
|
|
Channel1
|
2010-10-28 05:14:19 +08:00
|
|
|
end,
|
|
|
|
|
|
2010-11-09 22:12:59 +08:00
|
|
|
{AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame),
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
{ok, Queue} = ensure_queue(subscribe, Destination, Channel),
|
|
|
|
|
|
2011-01-19 00:24:42 +08:00
|
|
|
{ok, ConsumerTag, Description} = rabbit_stomp_util:consumer_tag(Frame),
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
amqp_channel:subscribe(Channel,
|
|
|
|
|
#'basic.consume'{
|
|
|
|
|
queue = Queue,
|
|
|
|
|
consumer_tag = ConsumerTag,
|
|
|
|
|
no_local = false,
|
|
|
|
|
no_ack = (AckMode == auto),
|
|
|
|
|
exclusive = false},
|
|
|
|
|
self()),
|
|
|
|
|
ExchangeAndKey = rabbit_stomp_util:parse_routing_information(Destination),
|
|
|
|
|
ok = ensure_queue_binding(Queue, ExchangeAndKey, Channel),
|
|
|
|
|
|
2010-11-26 19:10:43 +08:00
|
|
|
ok(State#state{subscriptions =
|
2011-01-11 02:43:41 +08:00
|
|
|
dict:store(ConsumerTag,
|
2011-01-19 00:24:42 +08:00
|
|
|
#subscription{dest_hdr = DestHdr,
|
|
|
|
|
channel = Channel,
|
|
|
|
|
multi_ack = IsMulti,
|
|
|
|
|
description = Description},
|
2011-01-11 02:43:41 +08:00
|
|
|
Subs)}).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
do_send(Destination, _DestHdr,
|
2010-10-28 19:57:58 +08:00
|
|
|
Frame = #stomp_frame{body_iolist = BodyFragments},
|
2010-10-28 05:14:19 +08:00
|
|
|
State = #state{channel = Channel}) ->
|
|
|
|
|
{ok, _Q} = ensure_queue(send, Destination, Channel),
|
|
|
|
|
|
2010-10-28 19:57:58 +08:00
|
|
|
Props = rabbit_stomp_util:message_properties(Frame),
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
{Exchange, RoutingKey} =
|
|
|
|
|
rabbit_stomp_util:parse_routing_information(Destination),
|
|
|
|
|
|
|
|
|
|
Method = #'basic.publish'{
|
|
|
|
|
exchange = list_to_binary(Exchange),
|
|
|
|
|
routing_key = list_to_binary(RoutingKey),
|
|
|
|
|
mandatory = false,
|
|
|
|
|
immediate = false},
|
|
|
|
|
|
|
|
|
|
case transactional(Frame) of
|
|
|
|
|
{yes, Transaction} ->
|
|
|
|
|
extend_transaction(Transaction,
|
|
|
|
|
{Method, Props, BodyFragments},
|
|
|
|
|
State);
|
|
|
|
|
no ->
|
2010-11-26 19:10:43 +08:00
|
|
|
ok(send_method(Method, Props, BodyFragments, State))
|
2010-10-28 05:14:19 +08:00
|
|
|
end.
|
|
|
|
|
|
2011-01-11 21:57:59 +08:00
|
|
|
create_ack_method(DeliveryTag, #subscription{multi_ack = IsMulti}) ->
|
|
|
|
|
#'basic.ack'{delivery_tag = DeliveryTag,
|
|
|
|
|
multiple = IsMulti}.
|
2011-01-11 19:25:44 +08:00
|
|
|
|
2011-01-11 21:57:59 +08:00
|
|
|
create_nack_method(DeliveryTag, _Subscription) ->
|
|
|
|
|
#'basic.reject'{delivery_tag = DeliveryTag}.
|
2011-01-11 19:25:44 +08:00
|
|
|
|
2010-11-09 19:57:20 +08:00
|
|
|
negotiate_version(Frame) ->
|
|
|
|
|
ClientVers = re:split(
|
|
|
|
|
rabbit_stomp_frame:header(Frame, "accept-version", "1.0"),
|
|
|
|
|
",",
|
|
|
|
|
[{return, list}]),
|
|
|
|
|
rabbit_stomp_util:negotiate_version(ClientVers, ?SUPPORTED_VERSIONS).
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
ensure_receipt(Frame, State) ->
|
|
|
|
|
case rabbit_stomp_frame:header(Frame, "receipt") of
|
|
|
|
|
{ok, Id} -> send_frame("RECEIPT", [{"receipt-id", Id}], "", State);
|
|
|
|
|
not_found -> State
|
|
|
|
|
end.
|
|
|
|
|
|
2010-10-28 19:04:40 +08:00
|
|
|
send_delivery(Delivery = #'basic.deliver'{consumer_tag = ConsumerTag},
|
|
|
|
|
Properties, Body,
|
|
|
|
|
State = #state{session_id = SessionId,
|
|
|
|
|
subscriptions = Subs}) ->
|
2011-01-13 22:18:25 +08:00
|
|
|
case dict:find(ConsumerTag, Subs) of
|
2011-01-18 19:03:35 +08:00
|
|
|
{ok, #subscription{dest_hdr = Destination}} ->
|
2011-01-13 22:18:25 +08:00
|
|
|
send_frame(
|
|
|
|
|
"MESSAGE",
|
|
|
|
|
rabbit_stomp_util:message_headers(Destination, SessionId,
|
|
|
|
|
Delivery, Properties),
|
|
|
|
|
Body,
|
|
|
|
|
State);
|
|
|
|
|
error ->
|
|
|
|
|
send_error("Subscription not found",
|
2011-01-19 00:24:42 +08:00
|
|
|
"There is no current subscription with tag '~s'.",
|
2011-01-13 22:18:25 +08:00
|
|
|
[ConsumerTag],
|
|
|
|
|
State)
|
|
|
|
|
end.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-01-06 23:54:05 +08:00
|
|
|
send_method(Method, Channel, State) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
amqp_channel:call(Channel, Method),
|
|
|
|
|
State.
|
|
|
|
|
|
|
|
|
|
send_method(Method, State = #state{channel = Channel}) ->
|
2011-01-13 23:24:25 +08:00
|
|
|
send_method(Method, Channel, State).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
send_method(Method, Properties, BodyFragments,
|
|
|
|
|
State = #state{channel = Channel}) ->
|
2011-01-15 01:09:50 +08:00
|
|
|
send_method(Method, Channel, Properties, BodyFragments, State).
|
|
|
|
|
|
|
|
|
|
send_method(Method, Channel, Properties, BodyFragments, State) ->
|
2010-11-26 19:10:43 +08:00
|
|
|
amqp_channel:call(Channel, Method, #amqp_msg{
|
2010-10-28 05:14:19 +08:00
|
|
|
props = Properties,
|
|
|
|
|
payload = lists:reverse(BodyFragments)}),
|
|
|
|
|
State.
|
|
|
|
|
|
|
|
|
|
shutdown_channel_and_connection(State = #state{channel = Channel,
|
|
|
|
|
connection = Connection,
|
|
|
|
|
subscriptions = Subs}) ->
|
|
|
|
|
dict:fold(
|
2010-11-09 22:12:59 +08:00
|
|
|
fun(_ConsumerTag, #subscription{channel = SubChannel}, Acc) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
case SubChannel of
|
|
|
|
|
Channel -> Acc;
|
|
|
|
|
_ ->
|
|
|
|
|
amqp_channel:close(SubChannel),
|
|
|
|
|
Acc
|
|
|
|
|
end
|
|
|
|
|
end, 0, Subs),
|
|
|
|
|
|
|
|
|
|
amqp_channel:close(Channel),
|
|
|
|
|
amqp_connection:close(Connection),
|
|
|
|
|
State#state{channel = none, connection = none}.
|
|
|
|
|
|
2010-11-09 22:12:59 +08:00
|
|
|
default_prefetch({queue, _}) ->
|
|
|
|
|
?DEFAULT_QUEUE_PREFETCH;
|
|
|
|
|
default_prefetch(_) ->
|
|
|
|
|
undefined.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Transaction Support
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
transactional(Frame) ->
|
|
|
|
|
case rabbit_stomp_frame:header(Frame, "transaction") of
|
|
|
|
|
{ok, Transaction} ->
|
|
|
|
|
{yes, Transaction};
|
|
|
|
|
not_found ->
|
|
|
|
|
no
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
transactional_action(Frame, Name, Fun, State) ->
|
|
|
|
|
case transactional(Frame) of
|
|
|
|
|
{yes, Transaction} ->
|
|
|
|
|
Fun(Transaction, State);
|
|
|
|
|
no ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Missing transaction",
|
|
|
|
|
Name ++ " must include a 'transaction' header\n",
|
|
|
|
|
State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
with_transaction(Transaction, State, Fun) ->
|
|
|
|
|
case get({transaction, Transaction}) of
|
|
|
|
|
undefined ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Bad transaction",
|
|
|
|
|
"Invalid transaction identifier: ~p\n",
|
|
|
|
|
[Transaction],
|
|
|
|
|
State);
|
2010-10-28 05:14:19 +08:00
|
|
|
Actions ->
|
|
|
|
|
Fun(Actions, State)
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
begin_transaction(Transaction, State) ->
|
|
|
|
|
put({transaction, Transaction}, []),
|
2010-11-26 19:10:43 +08:00
|
|
|
ok(State).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
extend_transaction(Transaction, Action, State0) ->
|
|
|
|
|
with_transaction(
|
|
|
|
|
Transaction, State0,
|
|
|
|
|
fun (Actions, State) ->
|
|
|
|
|
put({transaction, Transaction}, [Action | Actions]),
|
2010-11-26 19:10:43 +08:00
|
|
|
ok(State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end).
|
|
|
|
|
|
|
|
|
|
commit_transaction(Transaction, State0) ->
|
|
|
|
|
with_transaction(
|
|
|
|
|
Transaction, State0,
|
|
|
|
|
fun (Actions, State) ->
|
|
|
|
|
FinalState = lists:foldr(fun perform_transaction_action/2,
|
|
|
|
|
State,
|
|
|
|
|
Actions),
|
|
|
|
|
erase({transaction, Transaction}),
|
2011-01-11 02:43:41 +08:00
|
|
|
ok(FinalState)
|
2010-10-28 05:14:19 +08:00
|
|
|
end).
|
|
|
|
|
|
|
|
|
|
abort_transaction(Transaction, State0) ->
|
|
|
|
|
with_transaction(
|
|
|
|
|
Transaction, State0,
|
|
|
|
|
fun (_Actions, State) ->
|
|
|
|
|
erase({transaction, Transaction}),
|
2010-11-26 19:10:43 +08:00
|
|
|
ok(State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end).
|
|
|
|
|
|
|
|
|
|
perform_transaction_action({Method}, State) ->
|
|
|
|
|
send_method(Method, State);
|
|
|
|
|
perform_transaction_action({Channel, Method}, State) ->
|
2011-01-15 01:09:50 +08:00
|
|
|
send_method(Method, Channel, State);
|
2010-10-28 05:14:19 +08:00
|
|
|
perform_transaction_action({Method, Props, BodyFragments}, State) ->
|
|
|
|
|
send_method(Method, Props, BodyFragments, State).
|
|
|
|
|
|
2010-11-10 03:10:56 +08:00
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
%% Heartbeat Management
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
2010-11-11 05:09:20 +08:00
|
|
|
ensure_heartbeats(Heartbeats,
|
|
|
|
|
State = #state{socket = Sock, start_heartbeat_fun = SHF}) ->
|
2010-11-10 03:10:56 +08:00
|
|
|
[CX, CY] = [list_to_integer(X) ||
|
|
|
|
|
X <- re:split(Heartbeats, ",", [{return, list}])],
|
|
|
|
|
|
2010-11-11 05:09:20 +08:00
|
|
|
SendFun = fun() ->
|
|
|
|
|
catch gen_tcp:send(Sock, <<0>>)
|
|
|
|
|
end,
|
2010-11-10 03:10:56 +08:00
|
|
|
|
2010-11-11 05:09:20 +08:00
|
|
|
Pid = self(),
|
|
|
|
|
ReceiveFun = fun() ->
|
2011-01-31 21:00:15 +08:00
|
|
|
gen_server2:cast(Pid, client_timeout)
|
2010-11-11 05:09:20 +08:00
|
|
|
end,
|
2010-11-10 03:10:56 +08:00
|
|
|
|
2010-11-18 18:10:16 +08:00
|
|
|
{SendTimeout, ReceiveTimeout} =
|
|
|
|
|
{millis_to_seconds(CY), millis_to_seconds(CX)},
|
2010-11-11 05:09:20 +08:00
|
|
|
|
|
|
|
|
SHF(Sock, SendTimeout, SendFun, ReceiveTimeout, ReceiveFun),
|
|
|
|
|
|
|
|
|
|
{{SendTimeout * 1000 , ReceiveTimeout * 1000}, State}.
|
|
|
|
|
|
|
|
|
|
millis_to_seconds(M) when M =< 0 ->
|
|
|
|
|
0;
|
|
|
|
|
millis_to_seconds(M) ->
|
|
|
|
|
case M < 1000 of
|
|
|
|
|
true -> 1;
|
|
|
|
|
false -> M div 1000
|
|
|
|
|
end.
|
2010-11-10 03:10:56 +08:00
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Queue and Binding Setup
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
ensure_queue(subscribe, {exchange, _}, Channel) ->
|
|
|
|
|
%% Create anonymous, exclusive queue for SUBSCRIBE on /exchange destinations
|
|
|
|
|
#'queue.declare_ok'{queue = Queue} =
|
|
|
|
|
amqp_channel:call(Channel, #'queue.declare'{auto_delete = true,
|
|
|
|
|
exclusive = true}),
|
|
|
|
|
{ok, Queue};
|
|
|
|
|
ensure_queue(send, {exchange, _}, _Channel) ->
|
|
|
|
|
%% Don't create queues on SEND for /exchange destinations
|
|
|
|
|
{ok, undefined};
|
|
|
|
|
ensure_queue(_, {queue, Name}, Channel) ->
|
|
|
|
|
%% Always create named queue for /queue destinations
|
|
|
|
|
Queue = list_to_binary(Name),
|
2011-02-02 19:40:39 +08:00
|
|
|
amqp_channel:cast(Channel,
|
|
|
|
|
#'queue.declare'{durable = true,
|
|
|
|
|
queue = Queue,
|
|
|
|
|
nowait = true}),
|
2010-10-28 05:14:19 +08:00
|
|
|
{ok, Queue};
|
|
|
|
|
ensure_queue(subscribe, {topic, _}, Channel) ->
|
|
|
|
|
%% Create anonymous, exclusive queue for SUBSCRIBE on /topic destinations
|
|
|
|
|
#'queue.declare_ok'{queue = Queue} =
|
|
|
|
|
amqp_channel:call(Channel, #'queue.declare'{auto_delete = true,
|
|
|
|
|
exclusive = true}),
|
|
|
|
|
{ok, Queue};
|
|
|
|
|
ensure_queue(send, {topic, _}, _Channel) ->
|
|
|
|
|
%% Don't create queues on SEND for /topic destinations
|
|
|
|
|
{ok, undefined}.
|
|
|
|
|
|
2010-11-09 20:04:30 +08:00
|
|
|
ensure_queue_binding(QueueBin, {"", Queue}, _Channel) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
%% i.e., we should only be asked to bind to the default exchange a
|
|
|
|
|
%% queue with its own name
|
2010-11-09 20:04:30 +08:00
|
|
|
QueueBin = list_to_binary(Queue),
|
2010-10-28 05:14:19 +08:00
|
|
|
ok;
|
|
|
|
|
ensure_queue_binding(Queue, {Exchange, RoutingKey}, Channel) ->
|
|
|
|
|
#'queue.bind_ok'{} =
|
|
|
|
|
amqp_channel:call(Channel,
|
|
|
|
|
#'queue.bind'{
|
|
|
|
|
queue = Queue,
|
|
|
|
|
exchange = list_to_binary(Exchange),
|
|
|
|
|
routing_key = list_to_binary(RoutingKey)}),
|
|
|
|
|
ok.
|
2010-11-26 19:10:43 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Success/error handling
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
ok(State) ->
|
|
|
|
|
{ok, none, State}.
|
|
|
|
|
|
|
|
|
|
ok(Command, Headers, BodyFragments, State) ->
|
|
|
|
|
{ok, #stomp_frame{command = Command,
|
|
|
|
|
headers = Headers,
|
|
|
|
|
body_iolist = BodyFragments}, State}.
|
|
|
|
|
|
2011-01-17 18:38:43 +08:00
|
|
|
amqp_death(ReplyCode, Explanation, State) ->
|
2010-11-26 19:10:43 +08:00
|
|
|
ErrorName = ?PROTOCOL:amqp_exception(ReplyCode),
|
2011-01-13 23:38:12 +08:00
|
|
|
{stop, amqp_death,
|
|
|
|
|
send_error(atom_to_list(ErrorName),
|
2011-01-17 20:02:10 +08:00
|
|
|
format_detail("~s~n", [Explanation]),
|
2011-01-13 23:38:12 +08:00
|
|
|
State)}.
|
2010-11-26 19:10:43 +08:00
|
|
|
|
|
|
|
|
error(Message, Detail, State) ->
|
|
|
|
|
priv_error(Message, Detail, none, State).
|
|
|
|
|
|
|
|
|
|
error(Message, Format, Args, State) ->
|
|
|
|
|
priv_error(Message, Format, Args, none, State).
|
|
|
|
|
|
|
|
|
|
priv_error(Message, Detail, ServerPrivateDetail, State) ->
|
|
|
|
|
error_logger:error_msg("STOMP error frame sent:~n" ++
|
|
|
|
|
"Message: ~p~n" ++
|
|
|
|
|
"Detail: ~p~n" ++
|
|
|
|
|
"Server private detail: ~p~n",
|
|
|
|
|
[Message, Detail, ServerPrivateDetail]),
|
|
|
|
|
{error, Message, Detail, State}.
|
|
|
|
|
|
|
|
|
|
priv_error(Message, Format, Args, ServerPrivateDetail, State) ->
|
2011-01-17 18:50:24 +08:00
|
|
|
priv_error(Message, format_detail(Format, Args),
|
2010-11-26 19:10:43 +08:00
|
|
|
ServerPrivateDetail, State).
|
|
|
|
|
|
2011-01-17 18:50:24 +08:00
|
|
|
format_detail(Format, Args) ->
|
|
|
|
|
lists:flatten(io_lib:format(Format, Args)).
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Frame sending utilities
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
send_frame(Command, Headers, BodyFragments, State) ->
|
|
|
|
|
send_frame(#stomp_frame{command = Command,
|
|
|
|
|
headers = Headers,
|
|
|
|
|
body_iolist = BodyFragments},
|
|
|
|
|
State).
|
|
|
|
|
|
|
|
|
|
send_frame(Frame, State = #state{socket = Sock}) ->
|
|
|
|
|
%% We ignore certain errors here, as we will be receiving an
|
|
|
|
|
%% asynchronous notification of the same (or a related) fault
|
|
|
|
|
%% shortly anyway. See bug 21365.
|
2011-01-27 01:46:27 +08:00
|
|
|
rabbit_net:port_command(Sock, rabbit_stomp_frame:serialize(Frame)),
|
|
|
|
|
State.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
send_error(Message, Detail, State) ->
|
|
|
|
|
send_frame("ERROR", [{"message", Message},
|
2010-11-09 19:57:20 +08:00
|
|
|
{"content-type", "text/plain"},
|
|
|
|
|
{"version", string:join(?SUPPORTED_VERSIONS, ",")}],
|
|
|
|
|
Detail, State).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-01-13 22:18:25 +08:00
|
|
|
send_error(Message, Format, Args, State) ->
|
2011-01-17 18:50:24 +08:00
|
|
|
send_error(Message, format_detail(Format, Args), State).
|
2011-01-13 22:18:25 +08:00
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
2011-01-31 21:00:15 +08:00
|
|
|
%% Skeleton gen_server2 callbacks
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
2011-02-10 00:35:03 +08:00
|
|
|
handle_call(_Msg, _From, State) ->
|
|
|
|
|
{noreply, State}.
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
|
|
{ok, State}.
|
|
|
|
|
|