Untabify.

This commit is contained in:
Tony Garnock-Jones 2009-02-16 17:14:03 +00:00
parent bb29f8249a
commit ad6e392939
2 changed files with 381 additions and 381 deletions

View File

@ -35,9 +35,9 @@
-module(rabbit_stomp). -module(rabbit_stomp).
-export([kickstart/0, -export([kickstart/0,
start/1, start/1,
listener_started/2, listener_stopped/2, start_client/1, listener_started/2, listener_stopped/2, start_client/1,
start_link/0, init/1, mainloop/1]). start_link/0, init/1, mainloop/1]).
-include("rabbit.hrl"). -include("rabbit.hrl").
-include("rabbit_framing.hrl"). -include("rabbit_framing.hrl").
@ -64,18 +64,18 @@ start_listeners([]) ->
ok; ok;
start_listeners([{Host, Port} | More]) -> start_listeners([{Host, Port} | More]) ->
{IPAddress, Name} = rabbit_networking:check_tcp_listener_address(rabbit_stomp_listener_sup, {IPAddress, Name} = rabbit_networking:check_tcp_listener_address(rabbit_stomp_listener_sup,
Host, Host,
Port), Port),
{ok,_} = supervisor:start_child( {ok,_} = supervisor:start_child(
rabbit_sup, rabbit_sup,
{Name, {Name,
{tcp_listener_sup, start_link, {tcp_listener_sup, start_link,
[IPAddress, Port, [IPAddress, Port,
[{packet, raw}, [{packet, raw},
{reuseaddr, true}], {reuseaddr, true}],
{?MODULE, listener_started, []}, {?MODULE, listener_started, []},
{?MODULE, listener_stopped, []}, {?MODULE, listener_stopped, []},
{?MODULE, start_client, []}]}, {?MODULE, start_client, []}]},
transient, infinity, supervisor, [tcp_listener_sup]}), transient, infinity, supervisor, [tcp_listener_sup]}),
start_listeners(More). start_listeners(More).
@ -97,62 +97,62 @@ start_link() ->
init(_Parent) -> init(_Parent) ->
receive receive
{go, Sock} -> {go, Sock} ->
ok = inet:setopts(Sock, [{active, true}]), ok = inet:setopts(Sock, [{active, true}]),
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, {PeerAddress, PeerPort}} = inet:peername(Sock), {ok, {PeerAddress, PeerPort}} = inet:peername(Sock),
PeerAddressS = inet_parse:ntoa(PeerAddress), PeerAddressS = inet_parse:ntoa(PeerAddress),
error_logger:info_msg("starting STOMP connection ~p from ~s:~p~n", error_logger:info_msg("starting STOMP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]), [self(), PeerAddressS, PeerPort]),
?MODULE:mainloop(#state{socket = Sock, ?MODULE:mainloop(#state{socket = Sock,
channel = none, channel = none,
parse_state = stomp_frame:initial_state()}), parse_state = stomp_frame:initial_state()}),
error_logger:info_msg("ending STOMP connection ~p from ~s:~p~n", error_logger:info_msg("ending STOMP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]) [self(), PeerAddressS, PeerPort])
end. end.
mainloop(State) -> mainloop(State) ->
receive receive
E = {'EXIT', _Pid, _Reason} -> E = {'EXIT', _Pid, _Reason} ->
handle_exit(E, State); handle_exit(E, State);
{tcp, _Sock, Bytes} -> {tcp, _Sock, Bytes} ->
process_received_bytes(Bytes, State); process_received_bytes(Bytes, State);
{tcp_closed, _Sock} -> {tcp_closed, _Sock} ->
case State#state.channel of case State#state.channel of
none -> none ->
done; done;
ChPid -> ChPid ->
rabbit_channel:shutdown(ChPid), rabbit_channel:shutdown(ChPid),
?MODULE:mainloop(State) ?MODULE:mainloop(State)
end; end;
{send_command, Command} -> {send_command, Command} ->
?MODULE:mainloop(send_reply(Command, State)); ?MODULE:mainloop(send_reply(Command, State));
{send_command_and_notify, QPid, TxPid, Method, Content} -> {send_command_and_notify, QPid, TxPid, Method, Content} ->
State1 = send_reply(Method, Content, State), State1 = send_reply(Method, Content, State),
rabbit_amqqueue:notify_sent(QPid, TxPid), rabbit_amqqueue:notify_sent(QPid, TxPid),
?MODULE:mainloop(State1); ?MODULE:mainloop(State1);
{send_command_and_shutdown, Command} -> {send_command_and_shutdown, Command} ->
send_reply(Command, State), send_reply(Command, State),
done; done;
shutdown -> shutdown ->
%% This is the channel telling the writer to shut down. We %% This is the channel telling the writer to shut down. We
%% ignore this, as the channel will exit itself shortly, %% ignore this, as the channel will exit itself shortly,
%% which event we do respond to. %% which event we do respond to.
?MODULE:mainloop(State); ?MODULE:mainloop(State);
Data -> Data ->
error_logger:error_msg("Internal error: unknown STOMP Data: ~p~n", [Data]), error_logger:error_msg("Internal error: unknown STOMP Data: ~p~n", [Data]),
?MODULE:mainloop(State) ?MODULE:mainloop(State)
end. end.
simple_method_sync_rpc(Method, State0) -> simple_method_sync_rpc(Method, State0) ->
State = send_method(Method, State0), State = send_method(Method, State0),
receive receive
E = {'EXIT', _Pid, _Reason} -> E = {'EXIT', _Pid, _Reason} ->
handle_exit(E, State); handle_exit(E, State);
{send_command, Reply} -> {send_command, Reply} ->
{ok, Reply, State} {ok, Reply, State}
end. end.
handle_exit({'EXIT', _Pid, normal}, _State) -> handle_exit({'EXIT', _Pid, normal}, _State) ->
@ -173,26 +173,26 @@ process_received_bytes([], State) ->
?MODULE:mainloop(State); ?MODULE:mainloop(State);
process_received_bytes(Bytes, State = #state{parse_state = ParseState}) -> process_received_bytes(Bytes, State = #state{parse_state = ParseState}) ->
case stomp_frame:parse(Bytes, ParseState) of case stomp_frame:parse(Bytes, ParseState) of
{more, ParseState1} -> {more, ParseState1} ->
?MODULE:mainloop(State#state{parse_state = ParseState1}); ?MODULE:mainloop(State#state{parse_state = ParseState1});
{ok, Frame = #stomp_frame{command = Command}, Rest} -> {ok, Frame = #stomp_frame{command = Command}, Rest} ->
%% io:format("Frame: ~p~n", [Frame]), %% io:format("Frame: ~p~n", [Frame]),
case catch process_frame(Command, Frame, case catch process_frame(Command, Frame,
State#state{parse_state = stomp_frame:initial_state()}) of State#state{parse_state = stomp_frame:initial_state()}) of
{'EXIT', {amqp, Code, Method}} -> {'EXIT', {amqp, Code, Method}} ->
explain_amqp_death(Code, Method, State), explain_amqp_death(Code, Method, State),
done; done;
{'EXIT', Reason} -> {'EXIT', Reason} ->
send_error("Processing error", "~p~n", [Reason], State), send_error("Processing error", "~p~n", [Reason], State),
done; done;
{ok, NewState} -> {ok, NewState} ->
process_received_bytes(Rest, NewState); process_received_bytes(Rest, NewState);
stop -> stop ->
done done
end; end;
{error, Reason} -> {error, Reason} ->
send_error("Invalid frame", "Could not parse frame: ~p~n", [Reason], State), send_error("Invalid frame", "Could not parse frame: ~p~n", [Reason], State),
done done
end. end.
explain_amqp_death(Code, Method, State) -> explain_amqp_death(Code, Method, State) ->
@ -210,45 +210,45 @@ maybe_header(Key, Value) when is_integer(Value) -> [{Key, integer_to_list(Value)
maybe_header(_Key, _Value) -> []. maybe_header(_Key, _Value) -> [].
send_reply(#'basic.deliver'{consumer_tag = ConsumerTag, send_reply(#'basic.deliver'{consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag, delivery_tag = DeliveryTag,
exchange = Exchange, exchange = Exchange,
routing_key = RoutingKey}, routing_key = RoutingKey},
#content{properties = #'P_basic'{headers = Headers, #content{properties = #'P_basic'{headers = Headers,
content_type = ContentType, content_type = ContentType,
content_encoding = ContentEncoding, content_encoding = ContentEncoding,
delivery_mode = DeliveryMode, delivery_mode = DeliveryMode,
priority = Priority, priority = Priority,
correlation_id = CorrelationId, correlation_id = CorrelationId,
reply_to = ReplyTo, reply_to = ReplyTo,
message_id = MessageId}, message_id = MessageId},
payload_fragments_rev = BodyFragmentsRev}, payload_fragments_rev = BodyFragmentsRev},
State = #state{session_id = SessionId}) -> State = #state{session_id = SessionId}) ->
send_frame("MESSAGE", send_frame("MESSAGE",
[{"destination", binary_to_list(RoutingKey)}, [{"destination", binary_to_list(RoutingKey)},
{"exchange", binary_to_list(Exchange)}, {"exchange", binary_to_list(Exchange)},
%% TODO append ContentEncoding as ContentType; charset=ContentEncoding? %% TODO append ContentEncoding as ContentType; charset=ContentEncoding?
%% The STOMP SEND handle could also parse "content-type" to split it, perhaps? %% The STOMP SEND handle could also parse "content-type" to split it, perhaps?
{"message-id", SessionId ++ "_" ++ integer_to_list(DeliveryTag)}] {"message-id", SessionId ++ "_" ++ integer_to_list(DeliveryTag)}]
++ maybe_header("content-type", ContentType) ++ maybe_header("content-type", ContentType)
++ maybe_header("content-encoding", ContentEncoding) ++ maybe_header("content-encoding", ContentEncoding)
++ case ConsumerTag of ++ case ConsumerTag of
<<"Q_", _/binary>> -> <<"Q_", _/binary>> ->
[]; [];
<<"T_", Id/binary>> -> <<"T_", Id/binary>> ->
[{"subscription", binary_to_list(Id)}] [{"subscription", binary_to_list(Id)}]
end end
++ adhoc_convert_headers(case Headers of ++ adhoc_convert_headers(case Headers of
undefined -> []; undefined -> [];
_ -> Headers _ -> Headers
end) end)
++ maybe_header("delivery-mode", DeliveryMode) ++ maybe_header("delivery-mode", DeliveryMode)
++ maybe_header("priority", Priority) ++ maybe_header("priority", Priority)
++ maybe_header("correlation-id", CorrelationId) ++ maybe_header("correlation-id", CorrelationId)
++ maybe_header("reply-to", ReplyTo) ++ maybe_header("reply-to", ReplyTo)
++ maybe_header("amqp-message-id", MessageId), ++ maybe_header("amqp-message-id", MessageId),
lists:concat(lists:reverse(lists:map(fun erlang:binary_to_list/1, lists:concat(lists:reverse(lists:map(fun erlang:binary_to_list/1,
BodyFragmentsRev))), BodyFragmentsRev))),
State); State);
send_reply(Command, Content, State) -> send_reply(Command, Content, State) ->
error_logger:error_msg("STOMP Reply command unhandled: ~p~n~p~n", [Command, Content]), error_logger:error_msg("STOMP Reply command unhandled: ~p~n~p~n", [Command, Content]),
State. State.
@ -269,15 +269,15 @@ send_frame(Frame, State = #state{socket = Sock}) ->
send_frame(Command, Headers, Body, State) -> send_frame(Command, Headers, Body, State) ->
send_frame(#stomp_frame{command = Command, send_frame(#stomp_frame{command = Command,
headers = Headers, headers = Headers,
body = Body}, body = Body},
State). State).
send_error(Message, Detail, State) -> send_error(Message, Detail, State) ->
error_logger:error_msg("STOMP error frame sent:~nMessage: ~p~nDetail: ~p~n", error_logger:error_msg("STOMP error frame sent:~nMessage: ~p~nDetail: ~p~n",
[Message, Detail]), [Message, Detail]),
send_frame("ERROR", [{"message", Message}, send_frame("ERROR", [{"message", Message},
{"content-type", "text/plain"}], Detail, State). {"content-type", "text/plain"}], Detail, State).
send_error(Message, Format, Args, State) -> send_error(Message, Format, Args, State) ->
send_error(Message, lists:flatten(io_lib:format(Format, Args)), State). send_error(Message, lists:flatten(io_lib:format(Format, Args)), State).
@ -285,37 +285,37 @@ send_error(Message, Format, Args, State) ->
process_frame("CONNECT", Frame, State = #state{channel = none}) -> process_frame("CONNECT", Frame, State = #state{channel = none}) ->
{ok, DefaultVHost} = application:get_env(default_vhost), {ok, DefaultVHost} = application:get_env(default_vhost),
{ok, State1} = do_login(stomp_frame:header(Frame, "login"), {ok, State1} = do_login(stomp_frame:header(Frame, "login"),
stomp_frame:header(Frame, "passcode"), stomp_frame:header(Frame, "passcode"),
stomp_frame:header(Frame, "virtual-host", binary_to_list(DefaultVHost)), stomp_frame:header(Frame, "virtual-host", binary_to_list(DefaultVHost)),
State), State),
State2 = case stomp_frame:integer_header(Frame, "prefetch") of State2 = case stomp_frame:integer_header(Frame, "prefetch") of
{ok, PrefetchCount} -> {ok, PrefetchCount} ->
{ok, #'basic.qos_ok'{}, S} = {ok, #'basic.qos_ok'{}, S} =
simple_method_sync_rpc(#'basic.qos'{prefetch_size = 0, simple_method_sync_rpc(#'basic.qos'{prefetch_size = 0,
prefetch_count = PrefetchCount, prefetch_count = PrefetchCount,
global = false}, global = false},
State1), State1),
S; S;
not_found -> State1 not_found -> State1
end, end,
{ok, State2}; {ok, State2};
process_frame("DISCONNECT", _Frame, _State = #state{channel = none}) -> process_frame("DISCONNECT", _Frame, _State = #state{channel = none}) ->
stop; stop;
process_frame(_Command, _Frame, State = #state{channel = none}) -> process_frame(_Command, _Frame, State = #state{channel = none}) ->
{ok, send_error("Illegal command", {ok, send_error("Illegal command",
"You must log in using CONNECT first\n", "You must log in using CONNECT first\n",
State)}; State)};
process_frame(Command, Frame, State) -> process_frame(Command, Frame, State) ->
case process_command(Command, Frame, State) of case process_command(Command, Frame, State) of
{ok, State1} -> {ok, State1} ->
{ok, case stomp_frame:header(Frame, "receipt") of {ok, case stomp_frame:header(Frame, "receipt") of
{ok, Id} -> {ok, Id} ->
send_frame("RECEIPT", [{"receipt-id", Id}], "", State1); send_frame("RECEIPT", [{"receipt-id", Id}], "", State1);
not_found -> not_found ->
State1 State1
end}; end};
stop -> stop ->
stop stop
end. end.
send_method(Method, State = #state{channel = ChPid}) -> send_method(Method, State = #state{channel = ChPid}) ->
@ -324,28 +324,28 @@ send_method(Method, State = #state{channel = ChPid}) ->
send_method(Method, Properties, Body, State = #state{channel = ChPid}) -> send_method(Method, Properties, Body, State = #state{channel = ChPid}) ->
ok = rabbit_channel:do(ChPid, ok = rabbit_channel:do(ChPid,
Method, Method,
#content{class_id = 60, %% basic #content{class_id = 60, %% basic
properties = Properties, properties = Properties,
properties_bin = none, properties_bin = none,
payload_fragments_rev = [list_to_binary(Body)]}), payload_fragments_rev = [list_to_binary(Body)]}),
State. State.
do_login({ok, Login}, {ok, Passcode}, VirtualHost, State) -> do_login({ok, Login}, {ok, Passcode}, VirtualHost, State) ->
U = rabbit_access_control:user_pass_login(list_to_binary(Login), U = rabbit_access_control:user_pass_login(list_to_binary(Login),
list_to_binary(Passcode)), list_to_binary(Passcode)),
ok = rabbit_access_control:check_vhost_access(U, list_to_binary(VirtualHost)), ok = rabbit_access_control:check_vhost_access(U, list_to_binary(VirtualHost)),
ChPid = ChPid =
rabbit_channel:start_link(?MODULE, self(), self(), rabbit_channel:start_link(?MODULE, self(), self(),
U#user.username, list_to_binary(VirtualHost)), U#user.username, list_to_binary(VirtualHost)),
{ok, #'channel.open_ok'{}, State1} = {ok, #'channel.open_ok'{}, State1} =
simple_method_sync_rpc(#'channel.open'{out_of_band = <<"">>}, simple_method_sync_rpc(#'channel.open'{out_of_band = <<"">>},
State#state{channel = ChPid}), State#state{channel = ChPid}),
SessionId = rabbit_guid:string_guid("session"), SessionId = rabbit_guid:string_guid("session"),
{ok, send_frame("CONNECTED", {ok, send_frame("CONNECTED",
[{"session", SessionId}], [{"session", SessionId}],
"", "",
State1#state{session_id = SessionId})}; State1#state{session_id = SessionId})};
do_login(_, _, _, State) -> do_login(_, _, _, State) ->
{ok, send_error("Bad CONNECT", "Missing login or passcode header(s)\n", State)}. {ok, send_error("Bad CONNECT", "Missing login or passcode header(s)\n", State)}.
@ -361,39 +361,39 @@ user_binding_header_key(_) -> false.
make_string_table(_KeyFilter, []) -> []; make_string_table(_KeyFilter, []) -> [];
make_string_table(KeyFilter, [{K, V} | Rest]) -> make_string_table(KeyFilter, [{K, V} | Rest]) ->
case KeyFilter(K) of case KeyFilter(K) of
false -> false ->
make_string_table(KeyFilter, Rest); make_string_table(KeyFilter, Rest);
NewK -> NewK ->
[{list_to_binary(NewK), longstr, list_to_binary(V)} [{list_to_binary(NewK), longstr, list_to_binary(V)}
| make_string_table(KeyFilter, Rest)] | make_string_table(KeyFilter, Rest)]
end. end.
transactional(Frame) -> transactional(Frame) ->
case stomp_frame:header(Frame, "transaction") of case stomp_frame:header(Frame, "transaction") of
{ok, Transaction} -> {ok, Transaction} ->
{yes, Transaction}; {yes, Transaction};
not_found -> not_found ->
no no
end. end.
transactional_action(Frame, Name, Fun, State) -> transactional_action(Frame, Name, Fun, State) ->
case transactional(Frame) of case transactional(Frame) of
{yes, Transaction} -> {yes, Transaction} ->
Fun(Transaction, State); Fun(Transaction, State);
no -> no ->
{ok, send_error("Missing transaction", {ok, send_error("Missing transaction",
Name ++ " must include a 'transaction' header\n", Name ++ " must include a 'transaction' header\n",
State)} State)}
end. end.
with_transaction(Transaction, State, Fun) -> with_transaction(Transaction, State, Fun) ->
case get({transaction, Transaction}) of case get({transaction, Transaction}) of
undefined -> undefined ->
{ok, send_error("Bad transaction", {ok, send_error("Bad transaction",
"Invalid transaction identifier: ~p~n", [Transaction], "Invalid transaction identifier: ~p~n", [Transaction],
State)}; State)};
Actions -> Actions ->
Fun(Actions, State) Fun(Actions, State)
end. end.
begin_transaction(Transaction, State) -> begin_transaction(Transaction, State) ->
@ -402,27 +402,27 @@ begin_transaction(Transaction, State) ->
extend_transaction(Transaction, Action, State0) -> extend_transaction(Transaction, Action, State0) ->
with_transaction(Transaction, State0, with_transaction(Transaction, State0,
fun (Actions, State) -> fun (Actions, State) ->
put({transaction, Transaction}, [Action | Actions]), put({transaction, Transaction}, [Action | Actions]),
{ok, State} {ok, State}
end). end).
commit_transaction(Transaction, State0) -> commit_transaction(Transaction, State0) ->
with_transaction(Transaction, State0, with_transaction(Transaction, State0,
fun (Actions, State) -> fun (Actions, State) ->
FinalState = lists:foldr(fun perform_transaction_action/2, FinalState = lists:foldr(fun perform_transaction_action/2,
State, State,
Actions), Actions),
erase({transaction, Transaction}), erase({transaction, Transaction}),
{ok, FinalState} {ok, FinalState}
end). end).
abort_transaction(Transaction, State0) -> abort_transaction(Transaction, State0) ->
with_transaction(Transaction, State0, with_transaction(Transaction, State0,
fun (_Actions, State) -> fun (_Actions, State) ->
erase({transaction, Transaction}), erase({transaction, Transaction}),
{ok, State} {ok, State}
end). end).
perform_transaction_action({Method}, State) -> perform_transaction_action({Method}, State) ->
send_method(Method, State); send_method(Method, State);
@ -432,144 +432,144 @@ perform_transaction_action({Method, Props, Body}, State) ->
process_command("BEGIN", Frame, State) -> process_command("BEGIN", Frame, State) ->
transactional_action(Frame, "BEGIN", fun begin_transaction/2, State); transactional_action(Frame, "BEGIN", fun begin_transaction/2, State);
process_command("SEND", process_command("SEND",
Frame = #stomp_frame{headers = Headers, body = Body}, Frame = #stomp_frame{headers = Headers, body = Body},
State) -> State) ->
case stomp_frame:header(Frame, "destination") of case stomp_frame:header(Frame, "destination") of
{ok, RoutingKeyStr} -> {ok, RoutingKeyStr} ->
ExchangeStr = stomp_frame:header(Frame, "exchange", ""), ExchangeStr = stomp_frame:header(Frame, "exchange", ""),
Props = #'P_basic'{ Props = #'P_basic'{
content_type = stomp_frame:binary_header(Frame, "content-type", <<"text/plain">>), content_type = stomp_frame:binary_header(Frame, "content-type", <<"text/plain">>),
content_encoding = stomp_frame:binary_header(Frame, "content-encoding", undefined), content_encoding = stomp_frame:binary_header(Frame, "content-encoding", undefined),
headers = make_string_table(fun user_header_key/1, Headers), headers = make_string_table(fun user_header_key/1, Headers),
delivery_mode = stomp_frame:integer_header(Frame, "delivery-mode", undefined), delivery_mode = stomp_frame:integer_header(Frame, "delivery-mode", undefined),
priority = stomp_frame:integer_header(Frame, "priority", undefined), priority = stomp_frame:integer_header(Frame, "priority", undefined),
correlation_id = stomp_frame:binary_header(Frame, "correlation-id", undefined), correlation_id = stomp_frame:binary_header(Frame, "correlation-id", undefined),
reply_to = stomp_frame:binary_header(Frame, "reply-to", undefined), reply_to = stomp_frame:binary_header(Frame, "reply-to", undefined),
message_id = stomp_frame:binary_header(Frame, "amqp-message-id", undefined) message_id = stomp_frame:binary_header(Frame, "amqp-message-id", undefined)
}, },
Method = #'basic.publish'{exchange = list_to_binary(ExchangeStr), Method = #'basic.publish'{exchange = list_to_binary(ExchangeStr),
routing_key = list_to_binary(RoutingKeyStr), routing_key = list_to_binary(RoutingKeyStr),
mandatory = false, mandatory = false,
immediate = false}, immediate = false},
case transactional(Frame) of case transactional(Frame) of
{yes, Transaction} -> {yes, Transaction} ->
extend_transaction(Transaction, {Method, Props, Body}, State); extend_transaction(Transaction, {Method, Props, Body}, State);
no -> no ->
{ok, send_method(Method, Props, Body, State)} {ok, send_method(Method, Props, Body, State)}
end; end;
not_found -> not_found ->
{ok, send_error("Missing destination", {ok, send_error("Missing destination",
"SEND must include a 'destination', and optional 'exchange' header\n", "SEND must include a 'destination', and optional 'exchange' header\n",
State)} State)}
end; end;
process_command("ACK", Frame, State = #state{session_id = SessionId}) -> process_command("ACK", Frame, State = #state{session_id = SessionId}) ->
case stomp_frame:header(Frame, "message-id") of case stomp_frame:header(Frame, "message-id") of
{ok, IdStr} -> {ok, IdStr} ->
IdPrefix = SessionId ++ "_", IdPrefix = SessionId ++ "_",
case string:substr(IdStr, 1, length(IdPrefix)) of case string:substr(IdStr, 1, length(IdPrefix)) of
IdPrefix -> IdPrefix ->
DeliveryTag = list_to_integer(string:substr(IdStr, length(IdPrefix) + 1)), DeliveryTag = list_to_integer(string:substr(IdStr, length(IdPrefix) + 1)),
Method = #'basic.ack'{delivery_tag = DeliveryTag, Method = #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false}, multiple = false},
case transactional(Frame) of case transactional(Frame) of
{yes, Transaction} -> {yes, Transaction} ->
extend_transaction(Transaction, {Method}, State); extend_transaction(Transaction, {Method}, State);
no -> no ->
{ok, send_method(Method, State)} {ok, send_method(Method, State)}
end; end;
_ -> _ ->
rabbit_misc:die(command_invalid, 'basic.ack') rabbit_misc:die(command_invalid, 'basic.ack')
end; end;
not_found -> not_found ->
{ok, send_error("Missing message-id", {ok, send_error("Missing message-id",
"ACK must include a 'message-id' header\n", "ACK must include a 'message-id' header\n",
State)} State)}
end; end;
process_command("COMMIT", Frame, State) -> process_command("COMMIT", Frame, State) ->
transactional_action(Frame, "COMMIT", fun commit_transaction/2, State); transactional_action(Frame, "COMMIT", fun commit_transaction/2, State);
process_command("ABORT", Frame, State) -> process_command("ABORT", Frame, State) ->
transactional_action(Frame, "ABORT", fun abort_transaction/2, State); transactional_action(Frame, "ABORT", fun abort_transaction/2, State);
process_command("SUBSCRIBE", process_command("SUBSCRIBE",
Frame = #stomp_frame{headers = Headers}, Frame = #stomp_frame{headers = Headers},
State) -> State) ->
AckMode = case stomp_frame:header(Frame, "ack", "auto") of AckMode = case stomp_frame:header(Frame, "ack", "auto") of
"auto" -> auto; "auto" -> auto;
"client" -> client "client" -> client
end, end,
case stomp_frame:header(Frame, "destination") of case stomp_frame:header(Frame, "destination") of
{ok, QueueStr} -> {ok, QueueStr} ->
ConsumerTag = case stomp_frame:header(Frame, "id") of ConsumerTag = case stomp_frame:header(Frame, "id") of
{ok, Str} -> {ok, Str} ->
list_to_binary("T_" ++ Str); list_to_binary("T_" ++ Str);
not_found -> not_found ->
list_to_binary("Q_" ++ QueueStr) list_to_binary("Q_" ++ QueueStr)
end, end,
Queue = list_to_binary(QueueStr), Queue = list_to_binary(QueueStr),
State1 = send_method(#'queue.declare'{queue = Queue, State1 = send_method(#'queue.declare'{queue = Queue,
passive = stomp_frame:boolean_header(Frame, "passive", false), passive = stomp_frame:boolean_header(Frame, "passive", false),
durable = stomp_frame:boolean_header(Frame, "durable", false), durable = stomp_frame:boolean_header(Frame, "durable", false),
exclusive = stomp_frame:boolean_header(Frame, "exclusive", false), exclusive = stomp_frame:boolean_header(Frame, "exclusive", false),
auto_delete = stomp_frame:boolean_header(Frame, "auto-delete", true), auto_delete = stomp_frame:boolean_header(Frame, "auto-delete", true),
nowait = true, nowait = true,
arguments = arguments =
make_string_table(fun user_queue_header_key/1, make_string_table(fun user_queue_header_key/1,
Headers)}, Headers)},
State), State),
State2 = case stomp_frame:header(Frame, "exchange") of State2 = case stomp_frame:header(Frame, "exchange") of
{ok, ExchangeStr } -> {ok, ExchangeStr } ->
Exchange = list_to_binary(ExchangeStr), Exchange = list_to_binary(ExchangeStr),
RoutingKeyStr = stomp_frame:header(Frame, "routing_key", ""), RoutingKeyStr = stomp_frame:header(Frame, "routing_key", ""),
RoutingKey = list_to_binary(RoutingKeyStr), RoutingKey = list_to_binary(RoutingKeyStr),
send_method(#'queue.bind'{queue = Queue, send_method(#'queue.bind'{queue = Queue,
exchange = Exchange, exchange = Exchange,
routing_key = RoutingKey, routing_key = RoutingKey,
nowait = true, nowait = true,
arguments = arguments =
make_string_table( make_string_table(
fun user_binding_header_key/1, fun user_binding_header_key/1,
Headers)}, Headers)},
State1); State1);
not_found -> State1 not_found -> State1
end, end,
State3 = send_method(#'basic.consume'{queue = Queue, State3 = send_method(#'basic.consume'{queue = Queue,
consumer_tag = ConsumerTag, consumer_tag = ConsumerTag,
no_local = false, no_local = false,
no_ack = (AckMode == auto), no_ack = (AckMode == auto),
exclusive = false, exclusive = false,
nowait = true}, nowait = true},
State2), State2),
{ok, State3}; {ok, State3};
not_found -> not_found ->
{ok, send_error("Missing destination", {ok, send_error("Missing destination",
"SUBSCRIBE must include a 'destination' header\n", "SUBSCRIBE must include a 'destination' header\n",
State)} State)}
end; end;
process_command("UNSUBSCRIBE", Frame, State) -> process_command("UNSUBSCRIBE", Frame, State) ->
ConsumerTag = case stomp_frame:header(Frame, "id") of ConsumerTag = case stomp_frame:header(Frame, "id") of
{ok, IdStr} -> {ok, IdStr} ->
list_to_binary("T_" ++ IdStr); list_to_binary("T_" ++ IdStr);
not_found -> not_found ->
case stomp_frame:header(Frame, "destination") of case stomp_frame:header(Frame, "destination") of
{ok, QueueStr} -> {ok, QueueStr} ->
list_to_binary("Q_" ++ QueueStr); list_to_binary("Q_" ++ QueueStr);
not_found -> not_found ->
missing missing
end end
end, end,
if if
ConsumerTag == missing -> ConsumerTag == missing ->
{ok, send_error("Missing destination or id", {ok, send_error("Missing destination or id",
"UNSUBSCRIBE must include a 'destination' or 'id' header\n", "UNSUBSCRIBE must include a 'destination' or 'id' header\n",
State)}; State)};
true -> true ->
{ok, send_method(#'basic.cancel'{consumer_tag = ConsumerTag, {ok, send_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = true}, nowait = true},
State)} State)}
end; end;
process_command("DISCONNECT", _Frame, State) -> process_command("DISCONNECT", _Frame, State) ->
{ok, send_method(#'channel.close'{reply_code = 200, reply_text = <<"">>, {ok, send_method(#'channel.close'{reply_code = 200, reply_text = <<"">>,
class_id = 0, method_id = 0}, State)}; class_id = 0, method_id = 0}, State)};
process_command(Command, _Frame, State) -> process_command(Command, _Frame, State) ->
{ok, send_error("Bad command", {ok, send_error("Bad command",
"Could not interpret command " ++ Command ++ "\n", "Could not interpret command " ++ Command ++ "\n",
State)}. State)}.

View File

@ -60,14 +60,14 @@ parse_headers([$\n | Rest], ParseState = #hstate{state = command, acc = []}) ->
parse_headers(Rest, ParseState); parse_headers(Rest, ParseState);
parse_headers([$\n | Rest], ParseState = #hstate{state = command, acc = Acc}) -> parse_headers([$\n | Rest], ParseState = #hstate{state = command, acc = Acc}) ->
parse_headers(Rest, ParseState#hstate{state = key, acc = [], parse_headers(Rest, ParseState#hstate{state = key, acc = [],
command = lists:reverse(Acc)}); command = lists:reverse(Acc)});
parse_headers([$\n | Rest], _ParseState = #hstate{state = key, acc = Acc, parse_headers([$\n | Rest], _ParseState = #hstate{state = key, acc = Acc,
command = Command, headers = Headers}) -> command = Command, headers = Headers}) ->
case Acc of case Acc of
[] -> [] ->
{ok, Command, Headers, Rest}; {ok, Command, Headers, Rest};
_ -> _ ->
{error, {bad_header_key, lists:reverse(Acc)}} {error, {bad_header_key, lists:reverse(Acc)}}
end; end;
parse_headers([$: | Rest], ParseState = #hstate{state = key, acc = Acc}) -> parse_headers([$: | Rest], ParseState = #hstate{state = key, acc = Acc}) ->
parse_headers(Rest, ParseState#hstate{state = eatspace, acc = [], key = lists:reverse(Acc)}); parse_headers(Rest, ParseState#hstate{state = eatspace, acc = [], key = lists:reverse(Acc)});
@ -76,82 +76,82 @@ parse_headers([$ | Rest], ParseState = #hstate{state = eatspace}) ->
parse_headers(Input, ParseState = #hstate{state = eatspace}) -> parse_headers(Input, ParseState = #hstate{state = eatspace}) ->
parse_headers(Input, ParseState#hstate{state = value}); parse_headers(Input, ParseState#hstate{state = value});
parse_headers([$\n | Rest], ParseState = #hstate{state = value, acc = Acc, key = Key, parse_headers([$\n | Rest], ParseState = #hstate{state = value, acc = Acc, key = Key,
headers = Headers}) -> headers = Headers}) ->
parse_headers(Rest, ParseState#hstate{state = key, acc = [], parse_headers(Rest, ParseState#hstate{state = key, acc = [],
headers = [{Key, lists:reverse(Acc)} headers = [{Key, lists:reverse(Acc)}
| Headers]}); | Headers]});
parse_headers([Ch | Rest], ParseState = #hstate{acc = Acc}) -> parse_headers([Ch | Rest], ParseState = #hstate{acc = Acc}) ->
if if
Ch < 32 -> Ch < 32 ->
{error, {bad_character, Ch}}; {error, {bad_character, Ch}};
true -> true ->
parse_headers(Rest, ParseState#hstate{acc = [Ch | Acc]}) parse_headers(Rest, ParseState#hstate{acc = [Ch | Acc]})
end. end.
header(#stomp_frame{headers = Headers}, Key) -> header(#stomp_frame{headers = Headers}, Key) ->
case lists:keysearch(Key, 1, Headers) of case lists:keysearch(Key, 1, Headers) of
{value, {_, Str}} -> {value, {_, Str}} ->
{ok, Str}; {ok, Str};
_ -> _ ->
not_found not_found
end. end.
header(#stomp_frame{headers = Headers}, Key, DefaultValue) -> header(#stomp_frame{headers = Headers}, Key, DefaultValue) ->
case lists:keysearch(Key, 1, Headers) of case lists:keysearch(Key, 1, Headers) of
{value, {_, Str}} -> {value, {_, Str}} ->
Str; Str;
_ -> _ ->
DefaultValue DefaultValue
end. end.
boolean_header(#stomp_frame{headers = Headers}, Key) -> boolean_header(#stomp_frame{headers = Headers}, Key) ->
boolean_header(Headers, Key); boolean_header(Headers, Key);
boolean_header(Headers, Key) -> boolean_header(Headers, Key) ->
case lists:keysearch(Key, 1, Headers) of case lists:keysearch(Key, 1, Headers) of
{value, {_, "true"}} -> {value, {_, "true"}} ->
{ok, true}; {ok, true};
{value, {_, "false"}} -> {value, {_, "false"}} ->
{ok, false}; {ok, false};
_ -> _ ->
not_found not_found
end. end.
boolean_header(H, Key, D) -> boolean_header(H, Key, D) ->
case boolean_header(H, Key) of case boolean_header(H, Key) of
{ok, V} -> {ok, V} ->
V; V;
not_found -> not_found ->
D D
end. end.
integer_header(#stomp_frame{headers = Headers}, Key) -> integer_header(#stomp_frame{headers = Headers}, Key) ->
integer_header(Headers, Key); integer_header(Headers, Key);
integer_header(Headers, Key) -> integer_header(Headers, Key) ->
case lists:keysearch(Key, 1, Headers) of case lists:keysearch(Key, 1, Headers) of
{value, {_, Str}} -> {value, {_, Str}} ->
{ok, list_to_integer(string:strip(Str))}; {ok, list_to_integer(string:strip(Str))};
_ -> _ ->
not_found not_found
end. end.
integer_header(H, Key, D) -> integer_header(H, Key, D) ->
case integer_header(H, Key) of case integer_header(H, Key) of
{ok, V} -> {ok, V} ->
V; V;
not_found -> not_found ->
D D
end. end.
binary_header(F, K) -> binary_header(F, K) ->
case header(F, K) of case header(F, K) of
{ok, Str} -> {ok, list_to_binary(Str)}; {ok, Str} -> {ok, list_to_binary(Str)};
not_found -> not_found not_found -> not_found
end. end.
binary_header(F, K, V) -> binary_header(F, K, V) ->
case header(F, K) of case header(F, K) of
{ok, Str} -> list_to_binary(Str); {ok, Str} -> list_to_binary(Str);
not_found -> V not_found -> V
end. end.
content_length(Headers) -> content_length(Headers) ->
@ -159,10 +159,10 @@ content_length(Headers) ->
initial_body_state(Headers) -> initial_body_state(Headers) ->
case content_length(Headers) of case content_length(Headers) of
{ok, ByteCount} -> {ok, ByteCount} ->
#bstate{acc = [], remaining = ByteCount}; #bstate{acc = [], remaining = ByteCount};
not_found -> not_found ->
#bstate{acc = [], remaining = unknown} #bstate{acc = [], remaining = unknown}
end. end.
parse_body([], State) -> parse_body([], State) ->
@ -175,10 +175,10 @@ parse_body([Ch | Rest], State = #bstate{acc = Acc, remaining = unknown}) ->
parse_body(Rest, State#bstate{acc = [Ch | Acc]}); parse_body(Rest, State#bstate{acc = [Ch | Acc]});
parse_body([Ch | Rest], State = #bstate{acc = Acc, remaining = N}) -> parse_body([Ch | Rest], State = #bstate{acc = Acc, remaining = N}) ->
if if
N > 0 -> N > 0 ->
parse_body(Rest, State#bstate{acc = [Ch | Acc], remaining = N - 1}); parse_body(Rest, State#bstate{acc = [Ch | Acc], remaining = N - 1});
true -> true ->
{error, missing_body_terminator} {error, missing_body_terminator}
end. end.
initial_state() -> initial_state() ->
@ -186,36 +186,36 @@ initial_state() ->
parse(Rest, {headers, HState}) -> parse(Rest, {headers, HState}) ->
case parse_headers(Rest, HState) of case parse_headers(Rest, HState) of
{more, HState1} -> {more, HState1} ->
{more, {headers, HState1}}; {more, {headers, HState1}};
{ok, Command, Headers, Rest1} -> {ok, Command, Headers, Rest1} ->
parse(Rest1, #stomp_frame{command = Command, parse(Rest1, #stomp_frame{command = Command,
headers = Headers, headers = Headers,
body = initial_body_state(Headers)}); body = initial_body_state(Headers)});
E = {error, _} -> E = {error, _} ->
E E
end; end;
parse(Rest, Frame = #stomp_frame{body = BState}) -> parse(Rest, Frame = #stomp_frame{body = BState}) ->
case parse_body(Rest, BState) of case parse_body(Rest, BState) of
{more, BState1} -> {more, BState1} ->
{more, Frame#stomp_frame{body = BState1}}; {more, Frame#stomp_frame{body = BState1}};
{ok, Body, Rest1} -> {ok, Body, Rest1} ->
{ok, Frame#stomp_frame{body = Body}, Rest1}; {ok, Frame#stomp_frame{body = Body}, Rest1};
E = {error, _} -> E = {error, _} ->
E E
end. end.
serialize(#stomp_frame{command = Command, serialize(#stomp_frame{command = Command,
headers = Headers, headers = Headers,
body = Body}) -> body = Body}) ->
Len = length(Body), Len = length(Body),
[Command, $\n, [Command, $\n,
lists:map(fun serialize_header/1, lists:keydelete("content-length", 1, Headers)), lists:map(fun serialize_header/1, lists:keydelete("content-length", 1, Headers)),
if if
Len > 0 -> Len > 0 ->
["content-length:", integer_to_list(length(Body)), $\n]; ["content-length:", integer_to_list(length(Body)), $\n];
true -> true ->
[] []
end, end,
$\n, $\n,
Body, Body,