2012-02-03 01:52:58 +08:00
|
|
|
%% The contents of this file are subject to the Mozilla Public License
|
|
|
|
|
%% Version 1.1 (the "License"); you may not use this file except in
|
|
|
|
|
%% compliance with the License. You may obtain a copy of the License
|
|
|
|
|
%% at http://www.mozilla.org/MPL/
|
2010-10-28 05:14:19 +08:00
|
|
|
%%
|
2012-02-03 01:52:58 +08:00
|
|
|
%% Software distributed under the License is distributed on an "AS IS"
|
|
|
|
|
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
|
|
|
|
%% the License for the specific language governing rights and
|
|
|
|
|
%% limitations under the License.
|
2010-10-28 05:14:19 +08:00
|
|
|
%%
|
2012-02-03 01:52:58 +08:00
|
|
|
%% The Original Code is RabbitMQ.
|
2010-10-28 05:14:19 +08:00
|
|
|
%%
|
2012-02-03 01:52:58 +08:00
|
|
|
%% The Initial Developer of the Original Code is VMware, Inc.
|
|
|
|
|
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
|
2010-10-28 05:14:19 +08:00
|
|
|
%%
|
2012-02-03 01:52:58 +08:00
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
-module(rabbit_stomp_processor).
|
2011-01-31 21:00:15 +08:00
|
|
|
-behaviour(gen_server2).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-08-24 21:50:48 +08:00
|
|
|
-export([start_link/3, process_frame/2, flush_and_die/1]).
|
2010-10-28 05:14:19 +08:00
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
|
|
|
code_change/3, terminate/2]).
|
|
|
|
|
|
|
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
|
|
|
|
-include("rabbit_stomp_frame.hrl").
|
2011-06-06 18:03:17 +08:00
|
|
|
-include("rabbit_stomp.hrl").
|
2011-11-11 01:39:33 +08:00
|
|
|
-include("rabbit_stomp_prefixes.hrl").
|
2011-11-25 20:11:22 +08:00
|
|
|
-include("rabbit_stomp_headers.hrl").
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2012-02-16 01:08:54 +08:00
|
|
|
-record(state, {transport, session_id, channel,
|
2010-11-10 03:10:56 +08:00
|
|
|
connection, subscriptions, version,
|
2011-06-06 18:03:17 +08:00
|
|
|
start_heartbeat_fun, pending_receipts,
|
2012-02-18 01:52:15 +08:00
|
|
|
config, dest_queues, reply_queues, frame_transformer,
|
|
|
|
|
adapter_info, send_frame}).
|
2010-11-09 19:57:20 +08:00
|
|
|
|
2011-01-19 00:24:42 +08:00
|
|
|
-record(subscription, {dest_hdr, channel, multi_ack, description}).
|
2010-11-09 22:12:59 +08:00
|
|
|
|
2010-11-09 19:57:20 +08:00
|
|
|
-define(SUPPORTED_VERSIONS, ["1.0", "1.1"]).
|
2011-02-01 19:30:50 +08:00
|
|
|
-define(FLUSH_TIMEOUT, 60000).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Public API
|
|
|
|
|
%%----------------------------------------------------------------------------
|
2012-02-16 01:08:54 +08:00
|
|
|
start_link(Transport, StartHeartbeatFun, Configuration) ->
|
|
|
|
|
gen_server2:start_link(?MODULE, [Transport, StartHeartbeatFun, Configuration],
|
|
|
|
|
[]).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2012-01-18 22:48:54 +08:00
|
|
|
process_frame(Pid, Frame = #stomp_frame{command = "SEND"}) ->
|
|
|
|
|
credit_flow:send(Pid),
|
|
|
|
|
gen_server2:cast(Pid, {"SEND", Frame, self()});
|
2010-10-28 05:14:19 +08:00
|
|
|
process_frame(Pid, Frame = #stomp_frame{command = Command}) ->
|
2012-01-18 22:48:54 +08:00
|
|
|
gen_server2:cast(Pid, {Command, Frame, noflow}).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-08-24 21:50:48 +08:00
|
|
|
flush_and_die(Pid) ->
|
|
|
|
|
gen_server2:cast(Pid, flush_and_die).
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
2011-01-31 21:00:15 +08:00
|
|
|
%% Basic gen_server2 callbacks
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
2012-02-16 01:08:54 +08:00
|
|
|
init([Transport, StartHeartbeatFun, Configuration]) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
process_flag(trap_exit, true),
|
2012-02-18 01:52:15 +08:00
|
|
|
|
|
|
|
|
SendFrame = fun (Frame) ->
|
|
|
|
|
{SockMod, S} = Transport,
|
|
|
|
|
%% We ignore certain errors here, as we will be receiving an
|
|
|
|
|
%% asynchronous notification of the same (or a related) fault
|
|
|
|
|
%% shortly anyway. See bug 21365.
|
|
|
|
|
catch SockMod:port_command(S, rabbit_stomp_frame:serialize(Frame))
|
|
|
|
|
end,
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
{ok,
|
|
|
|
|
#state {
|
2012-02-16 01:08:54 +08:00
|
|
|
transport = Transport,
|
2010-11-11 05:09:20 +08:00
|
|
|
session_id = none,
|
|
|
|
|
channel = none,
|
|
|
|
|
connection = none,
|
|
|
|
|
subscriptions = dict:new(),
|
|
|
|
|
version = none,
|
2011-02-15 23:01:50 +08:00
|
|
|
start_heartbeat_fun = StartHeartbeatFun,
|
2011-06-06 18:03:17 +08:00
|
|
|
pending_receipts = undefined,
|
2011-07-19 23:15:59 +08:00
|
|
|
config = Configuration,
|
2012-01-19 03:38:44 +08:00
|
|
|
dest_queues = sets:new(),
|
2011-08-25 19:41:26 +08:00
|
|
|
reply_queues = dict:new(),
|
2012-02-18 01:52:15 +08:00
|
|
|
frame_transformer = undefined,
|
|
|
|
|
adapter_info = adapter_info(Transport),
|
|
|
|
|
send_frame = SendFrame},
|
2011-01-31 21:56:52 +08:00
|
|
|
hibernate,
|
|
|
|
|
{backoff, 1000, 1000, 10000}
|
2010-10-28 05:14:19 +08:00
|
|
|
}.
|
|
|
|
|
|
|
|
|
|
terminate(_Reason, State) ->
|
2011-08-25 19:35:38 +08:00
|
|
|
close_connection(State).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-08-24 21:50:48 +08:00
|
|
|
handle_cast(flush_and_die, State) ->
|
2011-08-25 19:35:38 +08:00
|
|
|
{stop, normal, close_connection(State)};
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2012-01-18 22:48:54 +08:00
|
|
|
handle_cast({"STOMP", Frame, noflow}, State) ->
|
2011-07-14 00:35:21 +08:00
|
|
|
process_connect(no_implicit, Frame, State);
|
|
|
|
|
|
2012-01-18 22:48:54 +08:00
|
|
|
handle_cast({"CONNECT", Frame, noflow}, State) ->
|
2011-07-14 00:35:21 +08:00
|
|
|
process_connect(no_implicit, Frame, State);
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-06-06 18:23:24 +08:00
|
|
|
handle_cast(Request, State = #state{channel = none,
|
|
|
|
|
config = #stomp_configuration{
|
|
|
|
|
implicit_connect = true}}) ->
|
|
|
|
|
{noreply, State1, _} =
|
2011-07-14 00:35:21 +08:00
|
|
|
process_connect(implicit, #stomp_frame{headers = []}, State),
|
2011-06-06 18:23:24 +08:00
|
|
|
handle_cast(Request, State1);
|
2011-08-24 21:50:48 +08:00
|
|
|
|
2011-06-06 18:23:24 +08:00
|
|
|
handle_cast(_Request, State = #state{channel = none,
|
|
|
|
|
config = #stomp_configuration{
|
|
|
|
|
implicit_connect = false}}) ->
|
2011-02-01 19:30:50 +08:00
|
|
|
{noreply,
|
|
|
|
|
send_error("Illegal command",
|
|
|
|
|
"You must log in using CONNECT first\n",
|
|
|
|
|
State),
|
|
|
|
|
hibernate};
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2012-01-18 22:48:54 +08:00
|
|
|
handle_cast({Command, Frame, FlowPid},
|
|
|
|
|
State = #state{frame_transformer = FT}) ->
|
|
|
|
|
case FlowPid of
|
|
|
|
|
noflow -> ok;
|
|
|
|
|
_ -> credit_flow:ack(FlowPid)
|
|
|
|
|
end,
|
2011-08-25 19:41:26 +08:00
|
|
|
Frame1 = FT(Frame),
|
2010-11-26 19:10:43 +08:00
|
|
|
process_request(
|
|
|
|
|
fun(StateN) ->
|
2011-08-25 19:41:26 +08:00
|
|
|
case validate_frame(Command, Frame1, StateN) of
|
2011-07-21 01:20:14 +08:00
|
|
|
R = {error, _, _, _} -> R;
|
2011-08-25 19:41:26 +08:00
|
|
|
_ -> handle_frame(Command, Frame1, StateN)
|
2011-07-21 01:20:14 +08:00
|
|
|
end
|
2010-11-26 19:10:43 +08:00
|
|
|
end,
|
2011-09-10 00:02:11 +08:00
|
|
|
fun(StateM) -> ensure_receipt(Frame1, StateM) end,
|
2011-01-11 02:43:41 +08:00
|
|
|
State);
|
2010-11-10 03:10:56 +08:00
|
|
|
|
2010-11-11 05:09:20 +08:00
|
|
|
handle_cast(client_timeout, State) ->
|
|
|
|
|
{stop, client_timeout, State}.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
handle_info(#'basic.consume_ok'{}, State) ->
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, State, hibernate};
|
2011-01-15 01:09:50 +08:00
|
|
|
handle_info(#'basic.cancel_ok'{}, State) ->
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, State, hibernate};
|
2011-02-16 17:56:39 +08:00
|
|
|
handle_info(#'basic.ack'{delivery_tag = Tag, multiple = IsMulti}, State) ->
|
|
|
|
|
{noreply, flush_pending_receipts(Tag, IsMulti, State), hibernate};
|
2010-10-28 05:14:19 +08:00
|
|
|
handle_info({Delivery = #'basic.deliver'{},
|
|
|
|
|
#amqp_msg{props = Props, payload = Payload}}, State) ->
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, send_delivery(Delivery, Props, Payload, State), hibernate};
|
2012-01-28 00:42:39 +08:00
|
|
|
handle_info({'EXIT', Conn,
|
|
|
|
|
{shutdown, {server_initiated_close, Code, Explanation}}},
|
2012-01-28 00:00:08 +08:00
|
|
|
State = #state{connection = Conn}) ->
|
2012-01-20 22:35:48 +08:00
|
|
|
amqp_death(Code, Explanation, State);
|
2012-01-28 00:00:08 +08:00
|
|
|
handle_info({'EXIT', Conn, Reason}, State = #state{connection = Conn}) ->
|
|
|
|
|
send_error("AMQP connection died", "Reason: ~p", [Reason], State),
|
2012-01-20 22:35:48 +08:00
|
|
|
{stop, {conn_died, Reason}, State};
|
2011-01-27 01:46:27 +08:00
|
|
|
handle_info({inet_reply, _, ok}, State) ->
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, State, hibernate};
|
2012-01-18 22:48:54 +08:00
|
|
|
handle_info({bump_credit, Msg}, State) ->
|
|
|
|
|
credit_flow:handle_bump_msg(Msg),
|
|
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
|
2011-01-27 01:46:27 +08:00
|
|
|
handle_info({inet_reply, _, Status}, State) ->
|
2011-01-31 21:01:34 +08:00
|
|
|
{stop, Status, State}.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2010-11-26 19:10:43 +08:00
|
|
|
process_request(ProcessFun, SuccessFun, State) ->
|
|
|
|
|
Res = case catch ProcessFun(State) of
|
|
|
|
|
{'EXIT',
|
2011-04-14 17:42:04 +08:00
|
|
|
{{shutdown,
|
|
|
|
|
{server_initiated_close, ReplyCode, Explanation}}, _}} ->
|
2011-01-17 18:38:43 +08:00
|
|
|
amqp_death(ReplyCode, Explanation, State);
|
2010-11-26 19:10:43 +08:00
|
|
|
{'EXIT', Reason} ->
|
|
|
|
|
priv_error("Processing error", "Processing error\n",
|
|
|
|
|
Reason, State);
|
|
|
|
|
Result ->
|
|
|
|
|
Result
|
|
|
|
|
end,
|
|
|
|
|
case Res of
|
|
|
|
|
{ok, Frame, NewState} ->
|
|
|
|
|
case Frame of
|
|
|
|
|
none -> ok;
|
|
|
|
|
_ -> send_frame(Frame, NewState)
|
|
|
|
|
end,
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, SuccessFun(NewState), hibernate};
|
2010-11-26 19:10:43 +08:00
|
|
|
{error, Message, Detail, NewState} ->
|
2011-01-31 21:56:52 +08:00
|
|
|
{noreply, send_error(Message, Detail, NewState), hibernate};
|
2011-02-16 23:06:46 +08:00
|
|
|
{stop, normal, NewState} ->
|
|
|
|
|
{stop, normal, SuccessFun(NewState)};
|
|
|
|
|
{stop, R, NewState} ->
|
|
|
|
|
{stop, R, NewState}
|
2010-11-26 19:10:43 +08:00
|
|
|
end.
|
|
|
|
|
|
2011-07-14 00:35:21 +08:00
|
|
|
process_connect(Implicit,
|
|
|
|
|
Frame,
|
|
|
|
|
State = #state{
|
|
|
|
|
channel = none,
|
|
|
|
|
config = #stomp_configuration{
|
|
|
|
|
default_login = DefaultLogin,
|
2012-02-18 01:52:15 +08:00
|
|
|
default_passcode = DefaultPasscode},
|
|
|
|
|
adapter_info = AdapterInfo}) ->
|
2011-07-14 00:35:21 +08:00
|
|
|
process_request(
|
|
|
|
|
fun(StateN) ->
|
|
|
|
|
case negotiate_version(Frame) of
|
|
|
|
|
{ok, Version} ->
|
2011-08-25 19:41:26 +08:00
|
|
|
FT = frame_transformer(Version),
|
|
|
|
|
Frame1 = FT(Frame),
|
2011-07-14 00:35:21 +08:00
|
|
|
{ok, DefaultVHost} =
|
|
|
|
|
application:get_env(rabbit, default_vhost),
|
|
|
|
|
Res = do_login(
|
2011-11-25 20:49:56 +08:00
|
|
|
rabbit_stomp_frame:header(Frame1,
|
|
|
|
|
?HEADER_LOGIN,
|
2011-07-14 00:35:21 +08:00
|
|
|
DefaultLogin),
|
2011-11-25 20:49:56 +08:00
|
|
|
rabbit_stomp_frame:header(Frame1,
|
|
|
|
|
?HEADER_PASSCODE,
|
2011-07-14 00:35:21 +08:00
|
|
|
DefaultPasscode),
|
2011-11-25 20:49:56 +08:00
|
|
|
rabbit_stomp_frame:header(Frame1,
|
|
|
|
|
?HEADER_HOST,
|
2011-07-14 00:35:21 +08:00
|
|
|
binary_to_list(
|
|
|
|
|
DefaultVHost)),
|
2011-11-25 20:49:56 +08:00
|
|
|
rabbit_stomp_frame:header(Frame1,
|
|
|
|
|
?HEADER_HEART_BEAT,
|
2011-07-14 00:35:21 +08:00
|
|
|
"0,0"),
|
2012-02-18 01:52:15 +08:00
|
|
|
AdapterInfo#adapter_info{protocol = {'STOMP', Version}},
|
2011-07-14 00:35:21 +08:00
|
|
|
Version,
|
2011-08-25 19:41:26 +08:00
|
|
|
StateN#state{frame_transformer = FT}),
|
2011-07-14 00:35:21 +08:00
|
|
|
case {Res, Implicit} of
|
|
|
|
|
{{ok, _, StateN1}, implicit} -> ok(StateN1);
|
|
|
|
|
_ -> Res
|
|
|
|
|
end;
|
|
|
|
|
{error, no_common_version} ->
|
|
|
|
|
error("Version mismatch",
|
|
|
|
|
"Supported versions are ~s\n",
|
|
|
|
|
[string:join(?SUPPORTED_VERSIONS, ",")],
|
|
|
|
|
StateN)
|
|
|
|
|
end
|
|
|
|
|
end,
|
|
|
|
|
fun(StateM) -> StateM end,
|
|
|
|
|
State).
|
|
|
|
|
|
2011-07-21 01:20:14 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
2011-08-25 19:41:26 +08:00
|
|
|
%% Frame Transformation
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
frame_transformer("1.0") -> fun rabbit_stomp_util:trim_headers/1;
|
|
|
|
|
frame_transformer(_) -> fun(Frame) -> Frame end.
|
|
|
|
|
|
|
|
|
|
%%----------------------------------------------------------------------------
|
2011-07-21 01:20:14 +08:00
|
|
|
%% Frame Validation
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
validate_frame(Command, Frame, State)
|
|
|
|
|
when Command =:= "SUBSCRIBE" orelse Command =:= "UNSUBSCRIBE" ->
|
|
|
|
|
Hdr = fun(Name) -> rabbit_stomp_frame:header(Frame, Name) end,
|
2011-11-25 20:11:22 +08:00
|
|
|
case {Hdr(?HEADER_PERSISTENT), Hdr(?HEADER_ID)} of
|
2011-07-21 01:20:14 +08:00
|
|
|
{{ok, "true"}, not_found} ->
|
|
|
|
|
error("Missing Header",
|
|
|
|
|
"Header 'id' is required for durable subscriptions", State);
|
|
|
|
|
_ ->
|
|
|
|
|
ok(State)
|
|
|
|
|
end;
|
|
|
|
|
validate_frame(_Command, _Frame, State) ->
|
|
|
|
|
ok(State).
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Frame handlers
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
handle_frame("DISCONNECT", _Frame, State) ->
|
2011-08-25 19:35:38 +08:00
|
|
|
{stop, normal, close_connection(State)};
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
handle_frame("SUBSCRIBE", Frame, State) ->
|
|
|
|
|
with_destination("SUBSCRIBE", Frame, State, fun do_subscribe/4);
|
|
|
|
|
|
2011-01-06 23:54:05 +08:00
|
|
|
handle_frame("UNSUBSCRIBE", Frame, State) ->
|
2011-01-18 20:35:53 +08:00
|
|
|
ConsumerTag = rabbit_stomp_util:consumer_tag(Frame),
|
2011-07-21 01:20:14 +08:00
|
|
|
cancel_subscription(ConsumerTag, Frame, State);
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
handle_frame("SEND", Frame, State) ->
|
2011-11-25 20:49:56 +08:00
|
|
|
without_headers(?HEADERS_NOT_ON_SEND, "SEND", Frame, State,
|
2012-01-20 22:34:56 +08:00
|
|
|
fun (_Command, Frame1, State1) ->
|
|
|
|
|
with_destination("SEND", Frame1, State1, fun do_send/4)
|
2011-11-25 20:49:56 +08:00
|
|
|
end);
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-01-11 19:25:44 +08:00
|
|
|
handle_frame("ACK", Frame, State) ->
|
2011-01-11 21:57:59 +08:00
|
|
|
ack_action("ACK", Frame, State, fun create_ack_method/2);
|
2011-01-11 19:25:44 +08:00
|
|
|
|
|
|
|
|
handle_frame("NACK", Frame, State) ->
|
2011-01-11 21:57:59 +08:00
|
|
|
ack_action("NACK", Frame, State, fun create_nack_method/2);
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
handle_frame("BEGIN", Frame, State) ->
|
|
|
|
|
transactional_action(Frame, "BEGIN", fun begin_transaction/2, State);
|
|
|
|
|
|
|
|
|
|
handle_frame("COMMIT", Frame, State) ->
|
|
|
|
|
transactional_action(Frame, "COMMIT", fun commit_transaction/2, State);
|
|
|
|
|
|
|
|
|
|
handle_frame("ABORT", Frame, State) ->
|
|
|
|
|
transactional_action(Frame, "ABORT", fun abort_transaction/2, State);
|
|
|
|
|
|
|
|
|
|
handle_frame(Command, _Frame, State) ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Bad command",
|
2011-08-24 21:50:48 +08:00
|
|
|
"Could not interpret command ~p\n",
|
|
|
|
|
[Command],
|
2010-11-26 19:10:43 +08:00
|
|
|
State).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Internal helpers for processing frames callbacks
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
2011-01-11 21:57:59 +08:00
|
|
|
ack_action(Command, Frame,
|
|
|
|
|
State = #state{subscriptions = Subs}, MethodFun) ->
|
2011-11-25 20:49:56 +08:00
|
|
|
case rabbit_stomp_frame:header(Frame, ?HEADER_MESSAGE_ID) of
|
2010-10-28 05:14:19 +08:00
|
|
|
{ok, IdStr} ->
|
|
|
|
|
case rabbit_stomp_util:parse_message_id(IdStr) of
|
2011-01-11 21:57:59 +08:00
|
|
|
{ok, {ConsumerTag, _SessionId, DeliveryTag}} ->
|
|
|
|
|
Subscription = #subscription{channel = SubChannel}
|
|
|
|
|
= dict:fetch(ConsumerTag, Subs),
|
|
|
|
|
Method = MethodFun(DeliveryTag, Subscription),
|
2010-10-28 05:14:19 +08:00
|
|
|
case transactional(Frame) of
|
|
|
|
|
{yes, Transaction} ->
|
|
|
|
|
extend_transaction(Transaction,
|
|
|
|
|
{SubChannel, Method},
|
|
|
|
|
State);
|
|
|
|
|
no ->
|
|
|
|
|
amqp_channel:call(SubChannel, Method),
|
2010-11-26 19:10:43 +08:00
|
|
|
ok(State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end;
|
|
|
|
|
_ ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Invalid message-id",
|
2011-01-11 19:25:44 +08:00
|
|
|
"~p must include a valid 'message-id' header\n",
|
|
|
|
|
[Command],
|
2010-11-26 19:10:43 +08:00
|
|
|
State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end;
|
|
|
|
|
not_found ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Missing message-id",
|
2011-01-11 19:25:44 +08:00
|
|
|
"~p must include a 'message-id' header\n",
|
|
|
|
|
[Command],
|
2010-11-26 19:10:43 +08:00
|
|
|
State)
|
2011-01-11 19:25:44 +08:00
|
|
|
end.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Internal helpers for processing frames callbacks
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
2011-07-21 01:20:14 +08:00
|
|
|
cancel_subscription({error, _}, _Frame, State) ->
|
2011-01-06 23:54:05 +08:00
|
|
|
error("Missing destination or id",
|
|
|
|
|
"UNSUBSCRIBE must include a 'destination' or 'id' header\n",
|
|
|
|
|
State);
|
|
|
|
|
|
2011-07-21 01:20:14 +08:00
|
|
|
cancel_subscription({ok, ConsumerTag, Description}, Frame,
|
2011-01-20 17:37:57 +08:00
|
|
|
State = #state{channel = MainChannel,
|
|
|
|
|
subscriptions = Subs}) ->
|
2011-01-06 23:54:05 +08:00
|
|
|
case dict:find(ConsumerTag, Subs) of
|
2011-01-18 18:23:36 +08:00
|
|
|
error ->
|
2011-01-06 23:54:05 +08:00
|
|
|
error("No subscription found",
|
2011-01-19 00:24:42 +08:00
|
|
|
"UNSUBSCRIBE must refer to an existing subscription.\n"
|
|
|
|
|
"Subscription to ~p not found.\n",
|
|
|
|
|
[Description],
|
2011-01-06 23:54:05 +08:00
|
|
|
State);
|
2011-07-21 01:20:14 +08:00
|
|
|
{ok, #subscription{dest_hdr = DestHdr, channel = SubChannel}} ->
|
2011-01-10 21:35:43 +08:00
|
|
|
case amqp_channel:call(SubChannel,
|
2011-01-20 17:37:57 +08:00
|
|
|
#'basic.cancel'{
|
|
|
|
|
consumer_tag = ConsumerTag}) of
|
2011-01-07 02:17:38 +08:00
|
|
|
#'basic.cancel_ok'{consumer_tag = ConsumerTag} ->
|
2011-07-21 01:20:14 +08:00
|
|
|
ok = ensure_subchannel_closed(SubChannel, MainChannel),
|
2011-01-20 17:37:57 +08:00
|
|
|
NewSubs = dict:erase(ConsumerTag, Subs),
|
2011-07-21 01:20:14 +08:00
|
|
|
maybe_delete_durable_sub(DestHdr, Frame,
|
2011-01-20 17:37:57 +08:00
|
|
|
State#state{
|
|
|
|
|
subscriptions = NewSubs});
|
2011-01-07 02:17:38 +08:00
|
|
|
_ ->
|
|
|
|
|
error("Failed to cancel subscription",
|
2011-01-19 00:24:42 +08:00
|
|
|
"UNSUBSCRIBE to ~p failed.\n",
|
2011-01-20 17:37:57 +08:00
|
|
|
[Description],
|
2011-01-07 02:17:38 +08:00
|
|
|
State)
|
|
|
|
|
end
|
2011-01-06 23:54:05 +08:00
|
|
|
end.
|
|
|
|
|
|
2011-07-21 01:20:14 +08:00
|
|
|
maybe_delete_durable_sub(DestHdr, Frame, State = #state{channel = Channel}) ->
|
|
|
|
|
case rabbit_stomp_util:parse_destination(DestHdr) of
|
|
|
|
|
{ok, {topic, Name}} ->
|
2011-12-01 20:51:02 +08:00
|
|
|
case rabbit_stomp_frame:boolean_header(Frame,
|
|
|
|
|
?HEADER_PERSISTENT, false) of
|
2011-07-21 01:20:14 +08:00
|
|
|
true ->
|
2011-11-25 20:11:22 +08:00
|
|
|
{ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID),
|
2011-07-21 01:20:14 +08:00
|
|
|
QName =
|
|
|
|
|
rabbit_stomp_util:durable_subscription_queue(Name, Id),
|
|
|
|
|
amqp_channel:call(Channel, #'queue.delete'{queue = QName,
|
|
|
|
|
nowait = false}),
|
|
|
|
|
ok(State);
|
|
|
|
|
false ->
|
|
|
|
|
ok(State)
|
|
|
|
|
end;
|
|
|
|
|
_ ->
|
|
|
|
|
ok(State)
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
ensure_subchannel_closed(SubChannel, MainChannel)
|
2011-01-20 17:37:57 +08:00
|
|
|
when SubChannel == MainChannel ->
|
2011-07-21 01:20:14 +08:00
|
|
|
ok;
|
2011-01-10 21:35:43 +08:00
|
|
|
|
2011-07-21 01:20:14 +08:00
|
|
|
ensure_subchannel_closed(SubChannel, _MainChannel) ->
|
2011-01-10 21:35:43 +08:00
|
|
|
amqp_channel:close(SubChannel),
|
2011-07-21 01:20:14 +08:00
|
|
|
ok.
|
2011-01-10 21:35:43 +08:00
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
with_destination(Command, Frame, State, Fun) ->
|
2011-11-25 20:11:22 +08:00
|
|
|
case rabbit_stomp_frame:header(Frame, ?HEADER_DESTINATION) of
|
2010-10-28 05:14:19 +08:00
|
|
|
{ok, DestHdr} ->
|
|
|
|
|
case rabbit_stomp_util:parse_destination(DestHdr) of
|
|
|
|
|
{ok, Destination} ->
|
|
|
|
|
Fun(Destination, DestHdr, Frame, State);
|
|
|
|
|
{error, {invalid_destination, Type, Content}} ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Invalid destination",
|
|
|
|
|
"'~s' is not a valid ~p destination\n",
|
|
|
|
|
[Content, Type],
|
|
|
|
|
State);
|
2010-10-28 05:14:19 +08:00
|
|
|
{error, {unknown_destination, Content}} ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Unknown destination",
|
|
|
|
|
"'~s' is not a valid destination.\n" ++
|
|
|
|
|
"Valid destination types are: " ++
|
2011-07-27 19:11:57 +08:00
|
|
|
string:join(rabbit_stomp_util:valid_dest_prefixes(),", ") ++
|
|
|
|
|
".\n",
|
2010-11-26 19:10:43 +08:00
|
|
|
[Content],
|
|
|
|
|
State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end;
|
|
|
|
|
not_found ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Missing destination",
|
|
|
|
|
"~p must include a 'destination' header\n",
|
|
|
|
|
[Command],
|
|
|
|
|
State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end.
|
|
|
|
|
|
2011-11-25 20:49:56 +08:00
|
|
|
without_headers([Hdr | Hdrs], Command, Frame, State, Fun) ->
|
|
|
|
|
case rabbit_stomp_frame:header(Frame, Hdr) of
|
|
|
|
|
{ok, _} ->
|
|
|
|
|
error("Invalid header",
|
|
|
|
|
"'~s' is not allowed on '~s'.\n",
|
|
|
|
|
[Hdr, Command],
|
|
|
|
|
State);
|
|
|
|
|
not_found ->
|
|
|
|
|
without_headers(Hdrs, Command, Frame, State, Fun)
|
|
|
|
|
end;
|
|
|
|
|
without_headers([], Command, Frame, State, Fun) ->
|
|
|
|
|
Fun(Command, Frame, State).
|
|
|
|
|
|
2011-06-06 18:03:17 +08:00
|
|
|
do_login(undefined, _, _, _, _, _, State) ->
|
|
|
|
|
error("Bad CONNECT", "Missing login or passcode header(s)\n", State);
|
|
|
|
|
|
|
|
|
|
do_login(Username0, Password0, VirtualHost0, Heartbeat, AdapterInfo,
|
2011-03-17 00:13:47 +08:00
|
|
|
Version, State) ->
|
2011-05-03 23:22:50 +08:00
|
|
|
Username = list_to_binary(Username0),
|
|
|
|
|
Password = list_to_binary(Password0),
|
|
|
|
|
VirtualHost = list_to_binary(VirtualHost0),
|
|
|
|
|
case rabbit_access_control:check_user_pass_login(Username, Password) of
|
|
|
|
|
{ok, _User} ->
|
|
|
|
|
case amqp_connection:start(
|
|
|
|
|
#amqp_params_direct{username = Username,
|
|
|
|
|
virtual_host = VirtualHost,
|
|
|
|
|
adapter_info = AdapterInfo}) of
|
|
|
|
|
{ok, Connection} ->
|
2012-01-20 22:35:48 +08:00
|
|
|
link(Connection),
|
2011-05-03 23:22:50 +08:00
|
|
|
{ok, Channel} = amqp_connection:open_channel(Connection),
|
2012-02-01 19:34:50 +08:00
|
|
|
SessionId =
|
|
|
|
|
rabbit_guid:string(rabbit_guid:gen_secure(), "session"),
|
2011-05-03 23:22:50 +08:00
|
|
|
{{SendTimeout, ReceiveTimeout}, State1} =
|
|
|
|
|
ensure_heartbeats(Heartbeat, State),
|
|
|
|
|
ok("CONNECTED",
|
2011-11-25 20:49:56 +08:00
|
|
|
[{?HEADER_SESSION, SessionId},
|
2011-12-01 20:51:02 +08:00
|
|
|
{?HEADER_HEART_BEAT,
|
|
|
|
|
io_lib:format("~B,~B", [SendTimeout, ReceiveTimeout])},
|
2011-11-25 20:49:56 +08:00
|
|
|
{?HEADER_VERSION, Version}],
|
2011-05-03 23:22:50 +08:00
|
|
|
"",
|
|
|
|
|
State1#state{session_id = SessionId,
|
|
|
|
|
channel = Channel,
|
|
|
|
|
connection = Connection});
|
|
|
|
|
{error, auth_failure} ->
|
2011-05-12 00:32:19 +08:00
|
|
|
error("Bad CONNECT", "Authentication failure\n", State);
|
|
|
|
|
{error, access_refused} ->
|
2011-05-03 23:22:50 +08:00
|
|
|
error("Bad CONNECT", "Authentication failure\n", State)
|
|
|
|
|
end;
|
|
|
|
|
{refused, _Msg, _Args} ->
|
2011-05-03 23:14:13 +08:00
|
|
|
error("Bad CONNECT", "Authentication failure\n", State)
|
2011-06-06 18:03:17 +08:00
|
|
|
end.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2012-02-18 01:52:15 +08:00
|
|
|
adapter_info({SockMod, S} = Transport) ->
|
2012-02-16 01:08:54 +08:00
|
|
|
{Addr, Port} = case SockMod:sockname(S) of
|
2011-08-24 21:50:48 +08:00
|
|
|
{ok, Res} -> Res;
|
|
|
|
|
_ -> {unknown, unknown}
|
|
|
|
|
end,
|
2012-02-16 01:08:54 +08:00
|
|
|
{PeerAddr, PeerPort} = case SockMod:peername(S) of
|
2011-08-24 21:50:48 +08:00
|
|
|
{ok, Res2} -> Res2;
|
|
|
|
|
_ -> {unknown, unknown}
|
|
|
|
|
end,
|
2012-02-18 01:52:15 +08:00
|
|
|
#adapter_info{address = Addr,
|
2011-06-08 01:08:54 +08:00
|
|
|
port = Port,
|
|
|
|
|
peer_address = PeerAddr,
|
|
|
|
|
peer_port = PeerPort,
|
2012-02-16 01:08:54 +08:00
|
|
|
additional_info = maybe_ssl_info(Transport)}.
|
2011-06-08 01:08:54 +08:00
|
|
|
|
2012-02-16 01:08:54 +08:00
|
|
|
maybe_ssl_info({SockMod, S} = Transport) ->
|
|
|
|
|
case SockMod:is_ssl(S) of
|
|
|
|
|
true -> [{ssl, true}] ++ ssl_info(Transport) ++ ssl_cert_info(Transport);
|
2011-06-08 01:08:54 +08:00
|
|
|
false -> [{ssl, false}]
|
|
|
|
|
end.
|
|
|
|
|
|
2012-02-16 01:08:54 +08:00
|
|
|
ssl_info({SockMod, S}) ->
|
2011-06-08 01:08:54 +08:00
|
|
|
{Protocol, KeyExchange, Cipher, Hash} =
|
2012-02-16 01:08:54 +08:00
|
|
|
case SockMod:ssl_info(S) of
|
2011-06-08 01:08:54 +08:00
|
|
|
{ok, {P, {K, C, H}}} -> {P, K, C, H};
|
|
|
|
|
{ok, {P, {K, C, H, _}}} -> {P, K, C, H};
|
|
|
|
|
_ -> {unknown, unknown, unknown, unknown}
|
|
|
|
|
end,
|
|
|
|
|
[{ssl_protocol, Protocol},
|
|
|
|
|
{ssl_key_exchange, KeyExchange},
|
|
|
|
|
{ssl_cipher, Cipher},
|
|
|
|
|
{ssl_hash, Hash}].
|
|
|
|
|
|
2012-02-16 01:08:54 +08:00
|
|
|
ssl_cert_info({SockMod, S}) ->
|
|
|
|
|
case SockMod:peercert(S) of
|
2011-06-08 01:08:54 +08:00
|
|
|
{ok, Cert} ->
|
|
|
|
|
[{peer_cert_issuer, list_to_binary(
|
|
|
|
|
rabbit_ssl:peer_cert_issuer(Cert))},
|
|
|
|
|
{peer_cert_subject, list_to_binary(
|
|
|
|
|
rabbit_ssl:peer_cert_subject(Cert))},
|
|
|
|
|
{peer_cert_validity, list_to_binary(
|
|
|
|
|
rabbit_ssl:peer_cert_validity(Cert))}];
|
|
|
|
|
_ ->
|
|
|
|
|
[]
|
2011-06-03 23:32:03 +08:00
|
|
|
end.
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
do_subscribe(Destination, DestHdr, Frame,
|
|
|
|
|
State = #state{subscriptions = Subs,
|
2012-01-19 03:38:44 +08:00
|
|
|
dest_queues = DestQs,
|
2010-10-28 05:14:19 +08:00
|
|
|
connection = Connection,
|
|
|
|
|
channel = MainChannel}) ->
|
2011-08-22 21:05:09 +08:00
|
|
|
Prefetch =
|
|
|
|
|
rabbit_stomp_frame:integer_header(Frame, "prefetch-count", undefined),
|
2010-11-09 22:12:59 +08:00
|
|
|
|
|
|
|
|
Channel = case Prefetch of
|
|
|
|
|
undefined ->
|
|
|
|
|
MainChannel;
|
|
|
|
|
_ ->
|
2010-10-28 05:14:19 +08:00
|
|
|
{ok, Channel1} = amqp_connection:open_channel(Connection),
|
|
|
|
|
amqp_channel:call(Channel1,
|
|
|
|
|
#'basic.qos'{prefetch_size = 0,
|
2010-11-09 22:12:59 +08:00
|
|
|
prefetch_count = Prefetch,
|
2010-10-28 05:14:19 +08:00
|
|
|
global = false}),
|
2010-11-09 22:12:59 +08:00
|
|
|
Channel1
|
2010-10-28 05:14:19 +08:00
|
|
|
end,
|
|
|
|
|
|
2010-11-09 22:12:59 +08:00
|
|
|
{AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame),
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2012-01-19 03:38:44 +08:00
|
|
|
{ok, Queue, DestQs1} = ensure_queue(subscribe, Destination, Frame, Channel,
|
|
|
|
|
DestQs),
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-01-19 00:24:42 +08:00
|
|
|
{ok, ConsumerTag, Description} = rabbit_stomp_util:consumer_tag(Frame),
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
amqp_channel:subscribe(Channel,
|
|
|
|
|
#'basic.consume'{
|
|
|
|
|
queue = Queue,
|
|
|
|
|
consumer_tag = ConsumerTag,
|
|
|
|
|
no_local = false,
|
|
|
|
|
no_ack = (AckMode == auto),
|
|
|
|
|
exclusive = false},
|
|
|
|
|
self()),
|
|
|
|
|
ExchangeAndKey = rabbit_stomp_util:parse_routing_information(Destination),
|
|
|
|
|
ok = ensure_queue_binding(Queue, ExchangeAndKey, Channel),
|
|
|
|
|
|
2010-11-26 19:10:43 +08:00
|
|
|
ok(State#state{subscriptions =
|
2011-01-11 02:43:41 +08:00
|
|
|
dict:store(ConsumerTag,
|
2011-01-19 00:24:42 +08:00
|
|
|
#subscription{dest_hdr = DestHdr,
|
|
|
|
|
channel = Channel,
|
|
|
|
|
multi_ack = IsMulti,
|
|
|
|
|
description = Description},
|
2012-01-19 03:38:44 +08:00
|
|
|
Subs),
|
|
|
|
|
dest_queues = DestQs1}).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
do_send(Destination, _DestHdr,
|
2010-10-28 19:57:58 +08:00
|
|
|
Frame = #stomp_frame{body_iolist = BodyFragments},
|
2012-01-19 03:38:44 +08:00
|
|
|
State = #state{channel = Channel, dest_queues = DestQs}) ->
|
|
|
|
|
{ok, _Q, DestQs1} = ensure_queue(send, Destination, Frame, Channel, DestQs),
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2012-01-19 03:38:44 +08:00
|
|
|
{Frame1, State1} =
|
|
|
|
|
ensure_reply_to(Frame, State#state{dest_queues = DestQs1}),
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-07-19 23:15:59 +08:00
|
|
|
Props = rabbit_stomp_util:message_properties(Frame1),
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
{Exchange, RoutingKey} =
|
|
|
|
|
rabbit_stomp_util:parse_routing_information(Destination),
|
|
|
|
|
|
|
|
|
|
Method = #'basic.publish'{
|
|
|
|
|
exchange = list_to_binary(Exchange),
|
|
|
|
|
routing_key = list_to_binary(RoutingKey),
|
|
|
|
|
mandatory = false,
|
|
|
|
|
immediate = false},
|
|
|
|
|
|
2011-07-19 23:15:59 +08:00
|
|
|
case transactional(Frame1) of
|
2010-10-28 05:14:19 +08:00
|
|
|
{yes, Transaction} ->
|
|
|
|
|
extend_transaction(Transaction,
|
2011-02-16 19:54:13 +08:00
|
|
|
fun(StateN) ->
|
2011-07-19 23:15:59 +08:00
|
|
|
maybe_record_receipt(Frame1, StateN)
|
2011-02-16 19:54:13 +08:00
|
|
|
end,
|
2010-10-28 05:14:19 +08:00
|
|
|
{Method, Props, BodyFragments},
|
2011-07-19 23:15:59 +08:00
|
|
|
State1);
|
2010-10-28 05:14:19 +08:00
|
|
|
no ->
|
2011-02-16 17:56:39 +08:00
|
|
|
ok(send_method(Method, Props, BodyFragments,
|
2011-07-19 23:15:59 +08:00
|
|
|
maybe_record_receipt(Frame1, State1)))
|
2010-10-28 05:14:19 +08:00
|
|
|
end.
|
|
|
|
|
|
2011-01-11 21:57:59 +08:00
|
|
|
create_ack_method(DeliveryTag, #subscription{multi_ack = IsMulti}) ->
|
|
|
|
|
#'basic.ack'{delivery_tag = DeliveryTag,
|
|
|
|
|
multiple = IsMulti}.
|
2011-01-11 19:25:44 +08:00
|
|
|
|
2011-01-17 20:35:45 +08:00
|
|
|
create_nack_method(DeliveryTag, #subscription{multi_ack = IsMulti}) ->
|
|
|
|
|
#'basic.nack'{delivery_tag = DeliveryTag,
|
|
|
|
|
multiple = IsMulti}.
|
2011-01-11 19:25:44 +08:00
|
|
|
|
2010-11-09 19:57:20 +08:00
|
|
|
negotiate_version(Frame) ->
|
2011-12-01 20:51:02 +08:00
|
|
|
ClientVers = re:split(rabbit_stomp_frame:header(
|
|
|
|
|
Frame, ?HEADER_ACCEPT_VERSION, "1.0"),
|
|
|
|
|
",", [{return, list}]),
|
2010-11-09 19:57:20 +08:00
|
|
|
rabbit_stomp_util:negotiate_version(ClientVers, ?SUPPORTED_VERSIONS).
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2010-10-28 19:04:40 +08:00
|
|
|
send_delivery(Delivery = #'basic.deliver'{consumer_tag = ConsumerTag},
|
|
|
|
|
Properties, Body,
|
|
|
|
|
State = #state{session_id = SessionId,
|
|
|
|
|
subscriptions = Subs}) ->
|
2011-01-13 22:18:25 +08:00
|
|
|
case dict:find(ConsumerTag, Subs) of
|
2011-01-18 19:03:35 +08:00
|
|
|
{ok, #subscription{dest_hdr = Destination}} ->
|
2011-01-13 22:18:25 +08:00
|
|
|
send_frame(
|
|
|
|
|
"MESSAGE",
|
|
|
|
|
rabbit_stomp_util:message_headers(Destination, SessionId,
|
|
|
|
|
Delivery, Properties),
|
|
|
|
|
Body,
|
|
|
|
|
State);
|
|
|
|
|
error ->
|
|
|
|
|
send_error("Subscription not found",
|
2011-01-19 00:24:42 +08:00
|
|
|
"There is no current subscription with tag '~s'.",
|
2011-01-13 22:18:25 +08:00
|
|
|
[ConsumerTag],
|
|
|
|
|
State)
|
|
|
|
|
end.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-01-06 23:54:05 +08:00
|
|
|
send_method(Method, Channel, State) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
amqp_channel:call(Channel, Method),
|
|
|
|
|
State.
|
|
|
|
|
|
|
|
|
|
send_method(Method, State = #state{channel = Channel}) ->
|
2011-01-13 23:24:25 +08:00
|
|
|
send_method(Method, Channel, State).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
send_method(Method, Properties, BodyFragments,
|
|
|
|
|
State = #state{channel = Channel}) ->
|
2011-01-15 01:09:50 +08:00
|
|
|
send_method(Method, Channel, Properties, BodyFragments, State).
|
|
|
|
|
|
2012-01-18 22:48:54 +08:00
|
|
|
send_method(Method = #'basic.publish'{}, Channel, Properties, BodyFragments,
|
|
|
|
|
State) ->
|
|
|
|
|
amqp_channel:cast_flow(
|
2011-02-09 18:24:48 +08:00
|
|
|
Channel, Method,
|
|
|
|
|
#amqp_msg{props = Properties,
|
2011-02-23 01:24:59 +08:00
|
|
|
payload = list_to_binary(BodyFragments)}),
|
2010-10-28 05:14:19 +08:00
|
|
|
State.
|
|
|
|
|
|
2011-08-25 19:35:38 +08:00
|
|
|
%% Closing the connection will close the channel and subchannels
|
|
|
|
|
close_connection(State = #state{connection = Connection}) ->
|
|
|
|
|
%% ignore noproc or other exceptions to avoid debris
|
2011-08-25 00:05:54 +08:00
|
|
|
catch amqp_connection:close(Connection),
|
2011-02-16 20:10:09 +08:00
|
|
|
State#state{channel = none, connection = none, subscriptions = none}.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-07-19 23:15:59 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Reply-To
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
ensure_reply_to(Frame = #stomp_frame{headers = Headers}, State) ->
|
2011-11-25 20:11:22 +08:00
|
|
|
case rabbit_stomp_frame:header(Frame, ?HEADER_REPLY_TO) of
|
2011-07-19 23:15:59 +08:00
|
|
|
not_found ->
|
|
|
|
|
{Frame, State};
|
|
|
|
|
{ok, ReplyTo} ->
|
|
|
|
|
{ok, Destination} = rabbit_stomp_util:parse_destination(ReplyTo),
|
|
|
|
|
case Destination of
|
|
|
|
|
{temp_queue, TempQueueId} ->
|
|
|
|
|
{ReplyQueue, State1} =
|
|
|
|
|
ensure_reply_queue(TempQueueId, State),
|
|
|
|
|
{Frame#stomp_frame{
|
2011-12-01 20:51:02 +08:00
|
|
|
headers = lists:keyreplace(
|
|
|
|
|
?HEADER_REPLY_TO, 1, Headers,
|
|
|
|
|
{?HEADER_REPLY_TO, ReplyQueue})},
|
2011-07-19 23:15:59 +08:00
|
|
|
State1};
|
|
|
|
|
_ ->
|
|
|
|
|
{Frame, State}
|
|
|
|
|
end
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
ensure_reply_queue(TempQueueId, State = #state{channel = Channel,
|
|
|
|
|
reply_queues = RQS,
|
|
|
|
|
subscriptions = Subs}) ->
|
|
|
|
|
case dict:find(TempQueueId, RQS) of
|
|
|
|
|
{ok, RQ} ->
|
2011-11-10 18:20:03 +08:00
|
|
|
{reply_to_destination(RQ), State};
|
2011-07-19 23:15:59 +08:00
|
|
|
error ->
|
|
|
|
|
#'queue.declare_ok'{queue = Queue} =
|
|
|
|
|
amqp_channel:call(Channel,
|
|
|
|
|
#'queue.declare'{auto_delete = true,
|
|
|
|
|
exclusive = true}),
|
|
|
|
|
|
|
|
|
|
#'basic.consume_ok'{consumer_tag = ConsumerTag} =
|
|
|
|
|
amqp_channel:subscribe(Channel,
|
|
|
|
|
#'basic.consume'{
|
|
|
|
|
queue = Queue,
|
|
|
|
|
no_ack = true,
|
|
|
|
|
nowait = false},
|
|
|
|
|
self()),
|
|
|
|
|
|
2011-11-10 18:20:03 +08:00
|
|
|
Destination = reply_to_destination(Queue),
|
2011-07-19 23:15:59 +08:00
|
|
|
|
|
|
|
|
%% synthesise a subscription to the reply queue destination
|
|
|
|
|
Subs1 = dict:store(ConsumerTag,
|
|
|
|
|
#subscription{dest_hdr = Destination,
|
|
|
|
|
channel = Channel,
|
|
|
|
|
multi_ack = false},
|
|
|
|
|
Subs),
|
|
|
|
|
|
|
|
|
|
{Destination, State#state{
|
|
|
|
|
reply_queues = dict:store(TempQueueId, Queue, RQS),
|
|
|
|
|
subscriptions = Subs1}}
|
|
|
|
|
end.
|
|
|
|
|
|
2011-11-10 18:20:03 +08:00
|
|
|
reply_to_destination(Queue) ->
|
|
|
|
|
?REPLY_QUEUE_PREFIX ++ binary_to_list(Queue).
|
2011-07-19 23:15:59 +08:00
|
|
|
|
2011-02-16 17:56:39 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Receipt Handling
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
ensure_receipt(Frame = #stomp_frame{command = Command}, State) ->
|
2011-11-25 20:49:56 +08:00
|
|
|
case rabbit_stomp_frame:header(Frame, ?HEADER_RECEIPT) of
|
2011-02-16 17:56:39 +08:00
|
|
|
{ok, Id} -> do_receipt(Command, Id, State);
|
|
|
|
|
not_found -> State
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
do_receipt("SEND", _, State) ->
|
|
|
|
|
%% SEND frame receipts are handled when messages are confirmed
|
|
|
|
|
State;
|
|
|
|
|
do_receipt(_Frame, ReceiptId, State) ->
|
|
|
|
|
send_frame("RECEIPT", [{"receipt-id", ReceiptId}], "", State).
|
|
|
|
|
|
|
|
|
|
maybe_record_receipt(Frame, State = #state{channel = Channel,
|
|
|
|
|
pending_receipts = PR}) ->
|
2011-11-25 20:49:56 +08:00
|
|
|
case rabbit_stomp_frame:header(Frame, ?HEADER_RECEIPT) of
|
2011-02-16 17:56:39 +08:00
|
|
|
{ok, Id} ->
|
|
|
|
|
PR1 = case PR of
|
|
|
|
|
undefined ->
|
|
|
|
|
amqp_channel:register_confirm_handler(
|
|
|
|
|
Channel, self()),
|
|
|
|
|
#'confirm.select_ok'{} =
|
|
|
|
|
amqp_channel:call(Channel, #'confirm.select'{}),
|
|
|
|
|
gb_trees:empty();
|
|
|
|
|
_ ->
|
|
|
|
|
PR
|
|
|
|
|
end,
|
|
|
|
|
SeqNo = amqp_channel:next_publish_seqno(Channel),
|
|
|
|
|
State#state{pending_receipts = gb_trees:insert(SeqNo, Id, PR1)};
|
|
|
|
|
not_found ->
|
|
|
|
|
State
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
flush_pending_receipts(DeliveryTag, IsMulti,
|
|
|
|
|
State = #state{pending_receipts = PR}) ->
|
|
|
|
|
{Receipts, PR1} = accumulate_receipts(DeliveryTag, IsMulti, PR),
|
|
|
|
|
State1 = lists:foldl(fun(ReceiptId, StateN) ->
|
|
|
|
|
do_receipt(none, ReceiptId, StateN)
|
|
|
|
|
end, State, Receipts),
|
|
|
|
|
State1#state{pending_receipts = PR1}.
|
|
|
|
|
|
|
|
|
|
accumulate_receipts(DeliveryTag, false, PR) ->
|
2011-02-25 01:12:21 +08:00
|
|
|
case gb_trees:lookup(DeliveryTag, PR) of
|
|
|
|
|
{value, ReceiptId} -> {[ReceiptId], gb_trees:delete(DeliveryTag, PR)};
|
|
|
|
|
none -> {[], PR}
|
|
|
|
|
end;
|
2011-08-26 01:43:24 +08:00
|
|
|
|
2011-02-16 17:56:39 +08:00
|
|
|
accumulate_receipts(DeliveryTag, true, PR) ->
|
2011-08-26 20:10:44 +08:00
|
|
|
case gb_trees:is_empty(PR) of
|
|
|
|
|
true -> {[], PR};
|
|
|
|
|
false -> accumulate_receipts1(DeliveryTag,
|
|
|
|
|
gb_trees:take_smallest(PR), [])
|
2011-02-25 01:12:21 +08:00
|
|
|
end.
|
2011-02-16 17:56:39 +08:00
|
|
|
|
2011-08-26 01:43:24 +08:00
|
|
|
accumulate_receipts1(DeliveryTag, {Key, Value, PR}, Acc)
|
|
|
|
|
when Key > DeliveryTag ->
|
|
|
|
|
{lists:reverse(Acc), gb_trees:insert(Key, Value, PR)};
|
2011-02-16 17:56:39 +08:00
|
|
|
accumulate_receipts1(DeliveryTag, {_Key, Value, PR}, Acc) ->
|
2011-08-26 01:43:24 +08:00
|
|
|
Acc1 = [Value | Acc],
|
|
|
|
|
case gb_trees:is_empty(PR) of
|
2011-09-10 00:02:11 +08:00
|
|
|
true -> {lists:reverse(Acc1), PR};
|
|
|
|
|
false -> accumulate_receipts1(DeliveryTag,
|
|
|
|
|
gb_trees:take_smallest(PR), Acc1)
|
2011-08-26 01:43:24 +08:00
|
|
|
end.
|
|
|
|
|
|
2011-02-16 17:56:39 +08:00
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Transaction Support
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
transactional(Frame) ->
|
|
|
|
|
case rabbit_stomp_frame:header(Frame, "transaction") of
|
2011-09-10 00:02:11 +08:00
|
|
|
{ok, Transaction} -> {yes, Transaction};
|
|
|
|
|
not_found -> no
|
2010-10-28 05:14:19 +08:00
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
transactional_action(Frame, Name, Fun, State) ->
|
|
|
|
|
case transactional(Frame) of
|
|
|
|
|
{yes, Transaction} ->
|
|
|
|
|
Fun(Transaction, State);
|
|
|
|
|
no ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Missing transaction",
|
2011-08-24 21:50:48 +08:00
|
|
|
"~p must include a 'transaction' header\n",
|
|
|
|
|
[Name],
|
2010-11-26 19:10:43 +08:00
|
|
|
State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
with_transaction(Transaction, State, Fun) ->
|
|
|
|
|
case get({transaction, Transaction}) of
|
|
|
|
|
undefined ->
|
2010-11-26 19:10:43 +08:00
|
|
|
error("Bad transaction",
|
|
|
|
|
"Invalid transaction identifier: ~p\n",
|
|
|
|
|
[Transaction],
|
|
|
|
|
State);
|
2010-10-28 05:14:19 +08:00
|
|
|
Actions ->
|
|
|
|
|
Fun(Actions, State)
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
begin_transaction(Transaction, State) ->
|
|
|
|
|
put({transaction, Transaction}, []),
|
2010-11-26 19:10:43 +08:00
|
|
|
ok(State).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-02-16 19:54:13 +08:00
|
|
|
extend_transaction(Transaction, Callback, Action, State) ->
|
|
|
|
|
extend_transaction(Transaction, {callback, Callback, Action}, State).
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
extend_transaction(Transaction, Action, State0) ->
|
|
|
|
|
with_transaction(
|
|
|
|
|
Transaction, State0,
|
|
|
|
|
fun (Actions, State) ->
|
|
|
|
|
put({transaction, Transaction}, [Action | Actions]),
|
2010-11-26 19:10:43 +08:00
|
|
|
ok(State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end).
|
|
|
|
|
|
|
|
|
|
commit_transaction(Transaction, State0) ->
|
|
|
|
|
with_transaction(
|
|
|
|
|
Transaction, State0,
|
|
|
|
|
fun (Actions, State) ->
|
|
|
|
|
FinalState = lists:foldr(fun perform_transaction_action/2,
|
|
|
|
|
State,
|
|
|
|
|
Actions),
|
|
|
|
|
erase({transaction, Transaction}),
|
2011-01-11 02:43:41 +08:00
|
|
|
ok(FinalState)
|
2010-10-28 05:14:19 +08:00
|
|
|
end).
|
|
|
|
|
|
|
|
|
|
abort_transaction(Transaction, State0) ->
|
|
|
|
|
with_transaction(
|
|
|
|
|
Transaction, State0,
|
|
|
|
|
fun (_Actions, State) ->
|
|
|
|
|
erase({transaction, Transaction}),
|
2010-11-26 19:10:43 +08:00
|
|
|
ok(State)
|
2010-10-28 05:14:19 +08:00
|
|
|
end).
|
|
|
|
|
|
2011-02-16 19:54:13 +08:00
|
|
|
perform_transaction_action({callback, Callback, Action}, State) ->
|
|
|
|
|
perform_transaction_action(Action, Callback(State));
|
2010-10-28 05:14:19 +08:00
|
|
|
perform_transaction_action({Method}, State) ->
|
|
|
|
|
send_method(Method, State);
|
|
|
|
|
perform_transaction_action({Channel, Method}, State) ->
|
2011-01-15 01:09:50 +08:00
|
|
|
send_method(Method, Channel, State);
|
2010-10-28 05:14:19 +08:00
|
|
|
perform_transaction_action({Method, Props, BodyFragments}, State) ->
|
|
|
|
|
send_method(Method, Props, BodyFragments, State).
|
|
|
|
|
|
2010-11-10 03:10:56 +08:00
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
%% Heartbeat Management
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
2010-11-11 05:09:20 +08:00
|
|
|
ensure_heartbeats(Heartbeats,
|
2012-02-18 01:52:15 +08:00
|
|
|
State = #state{transport = {SockMod, S},
|
|
|
|
|
start_heartbeat_fun = SHF,
|
|
|
|
|
send_frame = SendFrame}) ->
|
2010-11-10 03:10:56 +08:00
|
|
|
[CX, CY] = [list_to_integer(X) ||
|
|
|
|
|
X <- re:split(Heartbeats, ",", [{return, list}])],
|
|
|
|
|
|
2012-02-18 01:52:15 +08:00
|
|
|
SendFun = fun() -> SendFrame(<<$\n>>) end,
|
2010-11-11 05:09:20 +08:00
|
|
|
Pid = self(),
|
2011-09-10 00:02:11 +08:00
|
|
|
ReceiveFun = fun() -> gen_server2:cast(Pid, client_timeout) end,
|
2010-11-10 03:10:56 +08:00
|
|
|
|
2010-11-18 18:10:16 +08:00
|
|
|
{SendTimeout, ReceiveTimeout} =
|
|
|
|
|
{millis_to_seconds(CY), millis_to_seconds(CX)},
|
2010-11-11 05:09:20 +08:00
|
|
|
|
2012-02-18 01:52:15 +08:00
|
|
|
SHF(S, SendTimeout, SendFun, ReceiveTimeout, ReceiveFun),
|
2010-11-11 05:09:20 +08:00
|
|
|
|
|
|
|
|
{{SendTimeout * 1000 , ReceiveTimeout * 1000}, State}.
|
|
|
|
|
|
2011-09-10 00:02:11 +08:00
|
|
|
millis_to_seconds(M) when M =< 0 -> 0;
|
|
|
|
|
millis_to_seconds(M) when M < 1000 -> 1;
|
|
|
|
|
millis_to_seconds(M) -> M div 1000.
|
2010-11-10 03:10:56 +08:00
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Queue and Binding Setup
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
2012-01-19 03:38:44 +08:00
|
|
|
ensure_queue(subscribe, {exchange, _}, _Frame, Channel, DestQs) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
%% Create anonymous, exclusive queue for SUBSCRIBE on /exchange destinations
|
|
|
|
|
#'queue.declare_ok'{queue = Queue} =
|
|
|
|
|
amqp_channel:call(Channel, #'queue.declare'{auto_delete = true,
|
|
|
|
|
exclusive = true}),
|
2012-01-19 03:38:44 +08:00
|
|
|
{ok, Queue, DestQs};
|
|
|
|
|
ensure_queue(send, {exchange, _}, _Frame, _Channel, DestQs) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
%% Don't create queues on SEND for /exchange destinations
|
2012-01-19 03:38:44 +08:00
|
|
|
{ok, undefined, DestQs};
|
|
|
|
|
ensure_queue(_, {queue, Name}, _Frame, Channel, DestQs) ->
|
|
|
|
|
%% Always create named queue for /queue destinations the first
|
|
|
|
|
%% time we encounter it
|
2010-10-28 05:14:19 +08:00
|
|
|
Queue = list_to_binary(Name),
|
2012-01-19 03:38:44 +08:00
|
|
|
DestQs1 = case sets:is_element(Queue, DestQs) of
|
|
|
|
|
true -> DestQs;
|
|
|
|
|
false -> amqp_channel:cast(Channel,
|
|
|
|
|
#'queue.declare'{durable = true,
|
|
|
|
|
queue = Queue,
|
|
|
|
|
nowait = true}),
|
|
|
|
|
sets:add_element(Queue, DestQs)
|
|
|
|
|
end,
|
|
|
|
|
{ok, Queue, DestQs1};
|
|
|
|
|
ensure_queue(subscribe, {topic, Name}, Frame, Channel, DestQs) ->
|
2011-07-21 01:20:14 +08:00
|
|
|
%% Create queue for SUBSCRIBE on /topic destinations Queues are
|
|
|
|
|
%% anonymous, auto_delete and exclusive for transient
|
|
|
|
|
%% subscriptions. Durable subscriptions get shared, named, durable
|
|
|
|
|
%% queues.
|
|
|
|
|
Method =
|
2011-12-01 20:51:02 +08:00
|
|
|
case rabbit_stomp_frame:boolean_header(Frame,
|
|
|
|
|
?HEADER_PERSISTENT, false) of
|
2011-07-21 01:20:14 +08:00
|
|
|
true ->
|
2011-11-25 20:11:22 +08:00
|
|
|
{ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID),
|
2011-07-21 01:20:14 +08:00
|
|
|
QName = rabbit_stomp_util:durable_subscription_queue(Name, Id),
|
|
|
|
|
#'queue.declare'{durable = true, queue = QName};
|
|
|
|
|
false ->
|
|
|
|
|
#'queue.declare'{auto_delete = true, exclusive = true}
|
|
|
|
|
end,
|
|
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
#'queue.declare_ok'{queue = Queue} =
|
2011-07-21 01:20:14 +08:00
|
|
|
amqp_channel:call(Channel, Method),
|
2012-01-19 03:38:44 +08:00
|
|
|
{ok, Queue, DestQs};
|
|
|
|
|
ensure_queue(send, {topic, _}, _Frame, _Channel, DestQs) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
%% Don't create queues on SEND for /topic destinations
|
2012-01-19 03:38:44 +08:00
|
|
|
{ok, undefined, DestQs};
|
|
|
|
|
ensure_queue(_, {Type, Name}, _Frame, _Channel, DestQs)
|
2011-07-20 16:32:33 +08:00
|
|
|
when Type =:= reply_queue orelse Type =:= amqqueue ->
|
2012-01-19 03:38:44 +08:00
|
|
|
{ok, list_to_binary(Name), DestQs}.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2010-11-09 20:04:30 +08:00
|
|
|
ensure_queue_binding(QueueBin, {"", Queue}, _Channel) ->
|
2010-10-28 05:14:19 +08:00
|
|
|
%% i.e., we should only be asked to bind to the default exchange a
|
|
|
|
|
%% queue with its own name
|
2010-11-09 20:04:30 +08:00
|
|
|
QueueBin = list_to_binary(Queue),
|
2010-10-28 05:14:19 +08:00
|
|
|
ok;
|
|
|
|
|
ensure_queue_binding(Queue, {Exchange, RoutingKey}, Channel) ->
|
|
|
|
|
#'queue.bind_ok'{} =
|
|
|
|
|
amqp_channel:call(Channel,
|
|
|
|
|
#'queue.bind'{
|
|
|
|
|
queue = Queue,
|
|
|
|
|
exchange = list_to_binary(Exchange),
|
|
|
|
|
routing_key = list_to_binary(RoutingKey)}),
|
|
|
|
|
ok.
|
2010-11-26 19:10:43 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Success/error handling
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
ok(State) ->
|
|
|
|
|
{ok, none, State}.
|
|
|
|
|
|
|
|
|
|
ok(Command, Headers, BodyFragments, State) ->
|
|
|
|
|
{ok, #stomp_frame{command = Command,
|
|
|
|
|
headers = Headers,
|
|
|
|
|
body_iolist = BodyFragments}, State}.
|
|
|
|
|
|
2011-01-17 18:38:43 +08:00
|
|
|
amqp_death(ReplyCode, Explanation, State) ->
|
2010-11-26 19:10:43 +08:00
|
|
|
ErrorName = ?PROTOCOL:amqp_exception(ReplyCode),
|
2012-02-02 23:51:33 +08:00
|
|
|
ErrorDesc = rabbit_misc:format("~s~n", [Explanation]),
|
2012-02-01 01:07:07 +08:00
|
|
|
log_error(ErrorName, ErrorDesc, none),
|
2012-01-31 22:35:48 +08:00
|
|
|
{stop, normal, send_error(atom_to_list(ErrorName), ErrorDesc, State)}.
|
2010-11-26 19:10:43 +08:00
|
|
|
|
|
|
|
|
error(Message, Detail, State) ->
|
|
|
|
|
priv_error(Message, Detail, none, State).
|
|
|
|
|
|
|
|
|
|
error(Message, Format, Args, State) ->
|
|
|
|
|
priv_error(Message, Format, Args, none, State).
|
|
|
|
|
|
|
|
|
|
priv_error(Message, Detail, ServerPrivateDetail, State) ->
|
2012-02-01 00:32:56 +08:00
|
|
|
log_error(Message, Detail, ServerPrivateDetail),
|
2010-11-26 19:10:43 +08:00
|
|
|
{error, Message, Detail, State}.
|
|
|
|
|
|
|
|
|
|
priv_error(Message, Format, Args, ServerPrivateDetail, State) ->
|
2012-02-02 23:51:33 +08:00
|
|
|
priv_error(Message, rabbit_misc:format(Format, Args), ServerPrivateDetail,
|
2012-02-01 00:32:56 +08:00
|
|
|
State).
|
|
|
|
|
|
|
|
|
|
log_error(Message, Detail, ServerPrivateDetail) ->
|
2012-01-27 06:16:37 +08:00
|
|
|
rabbit_log:error("STOMP error frame sent:~n"
|
|
|
|
|
"Message: ~p~n"
|
|
|
|
|
"Detail: ~p~n"
|
|
|
|
|
"Server private detail: ~p~n",
|
2012-02-01 00:40:46 +08:00
|
|
|
[Message, Detail, ServerPrivateDetail]).
|
2010-11-26 19:10:43 +08:00
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Frame sending utilities
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
send_frame(Command, Headers, BodyFragments, State) ->
|
|
|
|
|
send_frame(#stomp_frame{command = Command,
|
|
|
|
|
headers = Headers,
|
|
|
|
|
body_iolist = BodyFragments},
|
|
|
|
|
State).
|
|
|
|
|
|
2012-02-18 01:52:15 +08:00
|
|
|
send_frame(Frame, State = #state{send_frame = SendFrame}) ->
|
|
|
|
|
SendFrame(Frame),
|
2011-01-27 01:46:27 +08:00
|
|
|
State.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
send_error(Message, Detail, State) ->
|
|
|
|
|
send_frame("ERROR", [{"message", Message},
|
2010-11-09 19:57:20 +08:00
|
|
|
{"content-type", "text/plain"},
|
|
|
|
|
{"version", string:join(?SUPPORTED_VERSIONS, ",")}],
|
|
|
|
|
Detail, State).
|
2010-10-28 05:14:19 +08:00
|
|
|
|
2011-01-13 22:18:25 +08:00
|
|
|
send_error(Message, Format, Args, State) ->
|
2012-02-02 23:51:33 +08:00
|
|
|
send_error(Message, rabbit_misc:format(Format, Args), State).
|
2011-01-13 22:18:25 +08:00
|
|
|
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
2011-01-31 21:00:15 +08:00
|
|
|
%% Skeleton gen_server2 callbacks
|
2010-10-28 05:14:19 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
2011-02-10 00:35:03 +08:00
|
|
|
handle_call(_Msg, _From, State) ->
|
|
|
|
|
{noreply, State}.
|
2010-10-28 05:14:19 +08:00
|
|
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
|
|
{ok, State}.
|