From ad6e3929399241fd3e62ce09124f4172f9030bea Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 16 Feb 2009 17:14:03 +0000 Subject: [PATCH] Untabify. --- deps/rabbitmq_stomp/src/rabbit_stomp.erl | 620 +++++++++++------------ deps/rabbitmq_stomp/src/stomp_frame.erl | 142 +++--- 2 files changed, 381 insertions(+), 381 deletions(-) diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp.erl b/deps/rabbitmq_stomp/src/rabbit_stomp.erl index 7455379958..bb6567917d 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp.erl @@ -35,9 +35,9 @@ -module(rabbit_stomp). -export([kickstart/0, - start/1, - listener_started/2, listener_stopped/2, start_client/1, - start_link/0, init/1, mainloop/1]). + start/1, + listener_started/2, listener_stopped/2, start_client/1, + start_link/0, init/1, mainloop/1]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -64,18 +64,18 @@ start_listeners([]) -> ok; start_listeners([{Host, Port} | More]) -> {IPAddress, Name} = rabbit_networking:check_tcp_listener_address(rabbit_stomp_listener_sup, - Host, - Port), + Host, + Port), {ok,_} = supervisor:start_child( rabbit_sup, {Name, {tcp_listener_sup, start_link, - [IPAddress, Port, - [{packet, raw}, - {reuseaddr, true}], - {?MODULE, listener_started, []}, - {?MODULE, listener_stopped, []}, - {?MODULE, start_client, []}]}, + [IPAddress, Port, + [{packet, raw}, + {reuseaddr, true}], + {?MODULE, listener_started, []}, + {?MODULE, listener_stopped, []}, + {?MODULE, start_client, []}]}, transient, infinity, supervisor, [tcp_listener_sup]}), start_listeners(More). @@ -97,62 +97,62 @@ start_link() -> init(_Parent) -> receive {go, Sock} -> - ok = inet:setopts(Sock, [{active, true}]), - process_flag(trap_exit, true), + ok = inet:setopts(Sock, [{active, true}]), + process_flag(trap_exit, true), - {ok, {PeerAddress, PeerPort}} = inet:peername(Sock), - PeerAddressS = inet_parse:ntoa(PeerAddress), - error_logger:info_msg("starting STOMP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), + {ok, {PeerAddress, PeerPort}} = inet:peername(Sock), + PeerAddressS = inet_parse:ntoa(PeerAddress), + error_logger:info_msg("starting STOMP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), - ?MODULE:mainloop(#state{socket = Sock, - channel = none, - parse_state = stomp_frame:initial_state()}), + ?MODULE:mainloop(#state{socket = Sock, + channel = none, + parse_state = stomp_frame:initial_state()}), - error_logger:info_msg("ending STOMP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]) + error_logger:info_msg("ending STOMP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]) end. mainloop(State) -> receive - E = {'EXIT', _Pid, _Reason} -> - handle_exit(E, State); - {tcp, _Sock, Bytes} -> - process_received_bytes(Bytes, State); - {tcp_closed, _Sock} -> - case State#state.channel of - none -> - done; - ChPid -> - rabbit_channel:shutdown(ChPid), - ?MODULE:mainloop(State) - end; - {send_command, Command} -> - ?MODULE:mainloop(send_reply(Command, State)); - {send_command_and_notify, QPid, TxPid, Method, Content} -> - State1 = send_reply(Method, Content, State), - rabbit_amqqueue:notify_sent(QPid, TxPid), - ?MODULE:mainloop(State1); - {send_command_and_shutdown, Command} -> - send_reply(Command, State), - done; - shutdown -> - %% This is the channel telling the writer to shut down. We - %% ignore this, as the channel will exit itself shortly, - %% which event we do respond to. - ?MODULE:mainloop(State); - Data -> - error_logger:error_msg("Internal error: unknown STOMP Data: ~p~n", [Data]), - ?MODULE:mainloop(State) + E = {'EXIT', _Pid, _Reason} -> + handle_exit(E, State); + {tcp, _Sock, Bytes} -> + process_received_bytes(Bytes, State); + {tcp_closed, _Sock} -> + case State#state.channel of + none -> + done; + ChPid -> + rabbit_channel:shutdown(ChPid), + ?MODULE:mainloop(State) + end; + {send_command, Command} -> + ?MODULE:mainloop(send_reply(Command, State)); + {send_command_and_notify, QPid, TxPid, Method, Content} -> + State1 = send_reply(Method, Content, State), + rabbit_amqqueue:notify_sent(QPid, TxPid), + ?MODULE:mainloop(State1); + {send_command_and_shutdown, Command} -> + send_reply(Command, State), + done; + shutdown -> + %% This is the channel telling the writer to shut down. We + %% ignore this, as the channel will exit itself shortly, + %% which event we do respond to. + ?MODULE:mainloop(State); + Data -> + error_logger:error_msg("Internal error: unknown STOMP Data: ~p~n", [Data]), + ?MODULE:mainloop(State) end. simple_method_sync_rpc(Method, State0) -> State = send_method(Method, State0), receive - E = {'EXIT', _Pid, _Reason} -> - handle_exit(E, State); - {send_command, Reply} -> - {ok, Reply, State} + E = {'EXIT', _Pid, _Reason} -> + handle_exit(E, State); + {send_command, Reply} -> + {ok, Reply, State} end. handle_exit({'EXIT', _Pid, normal}, _State) -> @@ -173,26 +173,26 @@ process_received_bytes([], State) -> ?MODULE:mainloop(State); process_received_bytes(Bytes, State = #state{parse_state = ParseState}) -> case stomp_frame:parse(Bytes, ParseState) of - {more, ParseState1} -> - ?MODULE:mainloop(State#state{parse_state = ParseState1}); - {ok, Frame = #stomp_frame{command = Command}, Rest} -> - %% io:format("Frame: ~p~n", [Frame]), - case catch process_frame(Command, Frame, - State#state{parse_state = stomp_frame:initial_state()}) of - {'EXIT', {amqp, Code, Method}} -> - explain_amqp_death(Code, Method, State), - done; - {'EXIT', Reason} -> - send_error("Processing error", "~p~n", [Reason], State), - done; - {ok, NewState} -> - process_received_bytes(Rest, NewState); - stop -> - done - end; - {error, Reason} -> - send_error("Invalid frame", "Could not parse frame: ~p~n", [Reason], State), - done + {more, ParseState1} -> + ?MODULE:mainloop(State#state{parse_state = ParseState1}); + {ok, Frame = #stomp_frame{command = Command}, Rest} -> + %% io:format("Frame: ~p~n", [Frame]), + case catch process_frame(Command, Frame, + State#state{parse_state = stomp_frame:initial_state()}) of + {'EXIT', {amqp, Code, Method}} -> + explain_amqp_death(Code, Method, State), + done; + {'EXIT', Reason} -> + send_error("Processing error", "~p~n", [Reason], State), + done; + {ok, NewState} -> + process_received_bytes(Rest, NewState); + stop -> + done + end; + {error, Reason} -> + send_error("Invalid frame", "Could not parse frame: ~p~n", [Reason], State), + done end. explain_amqp_death(Code, Method, State) -> @@ -210,45 +210,45 @@ maybe_header(Key, Value) when is_integer(Value) -> [{Key, integer_to_list(Value) maybe_header(_Key, _Value) -> []. send_reply(#'basic.deliver'{consumer_tag = ConsumerTag, - delivery_tag = DeliveryTag, - exchange = Exchange, - routing_key = RoutingKey}, - #content{properties = #'P_basic'{headers = Headers, - content_type = ContentType, - content_encoding = ContentEncoding, - delivery_mode = DeliveryMode, - priority = Priority, - correlation_id = CorrelationId, - reply_to = ReplyTo, - message_id = MessageId}, - payload_fragments_rev = BodyFragmentsRev}, - State = #state{session_id = SessionId}) -> + delivery_tag = DeliveryTag, + exchange = Exchange, + routing_key = RoutingKey}, + #content{properties = #'P_basic'{headers = Headers, + content_type = ContentType, + content_encoding = ContentEncoding, + delivery_mode = DeliveryMode, + priority = Priority, + correlation_id = CorrelationId, + reply_to = ReplyTo, + message_id = MessageId}, + payload_fragments_rev = BodyFragmentsRev}, + State = #state{session_id = SessionId}) -> send_frame("MESSAGE", - [{"destination", binary_to_list(RoutingKey)}, - {"exchange", binary_to_list(Exchange)}, - %% TODO append ContentEncoding as ContentType; charset=ContentEncoding? - %% The STOMP SEND handle could also parse "content-type" to split it, perhaps? - {"message-id", SessionId ++ "_" ++ integer_to_list(DeliveryTag)}] - ++ maybe_header("content-type", ContentType) - ++ maybe_header("content-encoding", ContentEncoding) - ++ case ConsumerTag of - <<"Q_", _/binary>> -> - []; - <<"T_", Id/binary>> -> - [{"subscription", binary_to_list(Id)}] - end - ++ adhoc_convert_headers(case Headers of - undefined -> []; - _ -> Headers - end) - ++ maybe_header("delivery-mode", DeliveryMode) - ++ maybe_header("priority", Priority) - ++ maybe_header("correlation-id", CorrelationId) - ++ maybe_header("reply-to", ReplyTo) - ++ maybe_header("amqp-message-id", MessageId), - lists:concat(lists:reverse(lists:map(fun erlang:binary_to_list/1, - BodyFragmentsRev))), - State); + [{"destination", binary_to_list(RoutingKey)}, + {"exchange", binary_to_list(Exchange)}, + %% TODO append ContentEncoding as ContentType; charset=ContentEncoding? + %% The STOMP SEND handle could also parse "content-type" to split it, perhaps? + {"message-id", SessionId ++ "_" ++ integer_to_list(DeliveryTag)}] + ++ maybe_header("content-type", ContentType) + ++ maybe_header("content-encoding", ContentEncoding) + ++ case ConsumerTag of + <<"Q_", _/binary>> -> + []; + <<"T_", Id/binary>> -> + [{"subscription", binary_to_list(Id)}] + end + ++ adhoc_convert_headers(case Headers of + undefined -> []; + _ -> Headers + end) + ++ maybe_header("delivery-mode", DeliveryMode) + ++ maybe_header("priority", Priority) + ++ maybe_header("correlation-id", CorrelationId) + ++ maybe_header("reply-to", ReplyTo) + ++ maybe_header("amqp-message-id", MessageId), + lists:concat(lists:reverse(lists:map(fun erlang:binary_to_list/1, + BodyFragmentsRev))), + State); send_reply(Command, Content, State) -> error_logger:error_msg("STOMP Reply command unhandled: ~p~n~p~n", [Command, Content]), State. @@ -269,15 +269,15 @@ send_frame(Frame, State = #state{socket = Sock}) -> send_frame(Command, Headers, Body, State) -> send_frame(#stomp_frame{command = Command, - headers = Headers, - body = Body}, - State). + headers = Headers, + body = Body}, + State). send_error(Message, Detail, State) -> error_logger:error_msg("STOMP error frame sent:~nMessage: ~p~nDetail: ~p~n", - [Message, Detail]), + [Message, Detail]), send_frame("ERROR", [{"message", Message}, - {"content-type", "text/plain"}], Detail, State). + {"content-type", "text/plain"}], Detail, State). send_error(Message, Format, Args, State) -> send_error(Message, lists:flatten(io_lib:format(Format, Args)), State). @@ -285,37 +285,37 @@ send_error(Message, Format, Args, State) -> process_frame("CONNECT", Frame, State = #state{channel = none}) -> {ok, DefaultVHost} = application:get_env(default_vhost), {ok, State1} = do_login(stomp_frame:header(Frame, "login"), - stomp_frame:header(Frame, "passcode"), - stomp_frame:header(Frame, "virtual-host", binary_to_list(DefaultVHost)), - State), + stomp_frame:header(Frame, "passcode"), + stomp_frame:header(Frame, "virtual-host", binary_to_list(DefaultVHost)), + State), State2 = case stomp_frame:integer_header(Frame, "prefetch") of - {ok, PrefetchCount} -> - {ok, #'basic.qos_ok'{}, S} = - simple_method_sync_rpc(#'basic.qos'{prefetch_size = 0, - prefetch_count = PrefetchCount, - global = false}, - State1), - S; - not_found -> State1 - end, + {ok, PrefetchCount} -> + {ok, #'basic.qos_ok'{}, S} = + simple_method_sync_rpc(#'basic.qos'{prefetch_size = 0, + prefetch_count = PrefetchCount, + global = false}, + State1), + S; + not_found -> State1 + end, {ok, State2}; process_frame("DISCONNECT", _Frame, _State = #state{channel = none}) -> stop; process_frame(_Command, _Frame, State = #state{channel = none}) -> {ok, send_error("Illegal command", - "You must log in using CONNECT first\n", - State)}; + "You must log in using CONNECT first\n", + State)}; process_frame(Command, Frame, State) -> case process_command(Command, Frame, State) of - {ok, State1} -> - {ok, case stomp_frame:header(Frame, "receipt") of - {ok, Id} -> - send_frame("RECEIPT", [{"receipt-id", Id}], "", State1); - not_found -> - State1 - end}; - stop -> - stop + {ok, State1} -> + {ok, case stomp_frame:header(Frame, "receipt") of + {ok, Id} -> + send_frame("RECEIPT", [{"receipt-id", Id}], "", State1); + not_found -> + State1 + end}; + stop -> + stop end. send_method(Method, State = #state{channel = ChPid}) -> @@ -324,28 +324,28 @@ send_method(Method, State = #state{channel = ChPid}) -> send_method(Method, Properties, Body, State = #state{channel = ChPid}) -> ok = rabbit_channel:do(ChPid, - Method, - #content{class_id = 60, %% basic - properties = Properties, - properties_bin = none, - payload_fragments_rev = [list_to_binary(Body)]}), + Method, + #content{class_id = 60, %% basic + properties = Properties, + properties_bin = none, + payload_fragments_rev = [list_to_binary(Body)]}), State. do_login({ok, Login}, {ok, Passcode}, VirtualHost, State) -> U = rabbit_access_control:user_pass_login(list_to_binary(Login), - list_to_binary(Passcode)), + list_to_binary(Passcode)), ok = rabbit_access_control:check_vhost_access(U, list_to_binary(VirtualHost)), ChPid = - rabbit_channel:start_link(?MODULE, self(), self(), - U#user.username, list_to_binary(VirtualHost)), + rabbit_channel:start_link(?MODULE, self(), self(), + U#user.username, list_to_binary(VirtualHost)), {ok, #'channel.open_ok'{}, State1} = - simple_method_sync_rpc(#'channel.open'{out_of_band = <<"">>}, - State#state{channel = ChPid}), + simple_method_sync_rpc(#'channel.open'{out_of_band = <<"">>}, + State#state{channel = ChPid}), SessionId = rabbit_guid:string_guid("session"), {ok, send_frame("CONNECTED", - [{"session", SessionId}], - "", - State1#state{session_id = SessionId})}; + [{"session", SessionId}], + "", + State1#state{session_id = SessionId})}; do_login(_, _, _, State) -> {ok, send_error("Bad CONNECT", "Missing login or passcode header(s)\n", State)}. @@ -361,39 +361,39 @@ user_binding_header_key(_) -> false. make_string_table(_KeyFilter, []) -> []; make_string_table(KeyFilter, [{K, V} | Rest]) -> case KeyFilter(K) of - false -> - make_string_table(KeyFilter, Rest); - NewK -> - [{list_to_binary(NewK), longstr, list_to_binary(V)} - | make_string_table(KeyFilter, Rest)] + false -> + make_string_table(KeyFilter, Rest); + NewK -> + [{list_to_binary(NewK), longstr, list_to_binary(V)} + | make_string_table(KeyFilter, Rest)] end. transactional(Frame) -> case stomp_frame:header(Frame, "transaction") of - {ok, Transaction} -> - {yes, Transaction}; - not_found -> - no + {ok, Transaction} -> + {yes, Transaction}; + not_found -> + no end. transactional_action(Frame, Name, Fun, State) -> case transactional(Frame) of - {yes, Transaction} -> - Fun(Transaction, State); - no -> - {ok, send_error("Missing transaction", - Name ++ " must include a 'transaction' header\n", - State)} + {yes, Transaction} -> + Fun(Transaction, State); + no -> + {ok, send_error("Missing transaction", + Name ++ " must include a 'transaction' header\n", + State)} end. with_transaction(Transaction, State, Fun) -> case get({transaction, Transaction}) of - undefined -> - {ok, send_error("Bad transaction", - "Invalid transaction identifier: ~p~n", [Transaction], - State)}; - Actions -> - Fun(Actions, State) + undefined -> + {ok, send_error("Bad transaction", + "Invalid transaction identifier: ~p~n", [Transaction], + State)}; + Actions -> + Fun(Actions, State) end. begin_transaction(Transaction, State) -> @@ -402,27 +402,27 @@ begin_transaction(Transaction, State) -> extend_transaction(Transaction, Action, State0) -> with_transaction(Transaction, State0, - fun (Actions, State) -> - put({transaction, Transaction}, [Action | Actions]), - {ok, State} - end). + fun (Actions, State) -> + put({transaction, Transaction}, [Action | Actions]), + {ok, State} + end). commit_transaction(Transaction, State0) -> with_transaction(Transaction, State0, - fun (Actions, State) -> - FinalState = lists:foldr(fun perform_transaction_action/2, - State, - Actions), - erase({transaction, Transaction}), - {ok, FinalState} - end). + fun (Actions, State) -> + FinalState = lists:foldr(fun perform_transaction_action/2, + State, + Actions), + erase({transaction, Transaction}), + {ok, FinalState} + end). abort_transaction(Transaction, State0) -> with_transaction(Transaction, State0, - fun (_Actions, State) -> - erase({transaction, Transaction}), - {ok, State} - end). + fun (_Actions, State) -> + erase({transaction, Transaction}), + {ok, State} + end). perform_transaction_action({Method}, State) -> send_method(Method, State); @@ -432,144 +432,144 @@ perform_transaction_action({Method, Props, Body}, State) -> process_command("BEGIN", Frame, State) -> transactional_action(Frame, "BEGIN", fun begin_transaction/2, State); process_command("SEND", - Frame = #stomp_frame{headers = Headers, body = Body}, - State) -> + Frame = #stomp_frame{headers = Headers, body = Body}, + State) -> case stomp_frame:header(Frame, "destination") of - {ok, RoutingKeyStr} -> - ExchangeStr = stomp_frame:header(Frame, "exchange", ""), - Props = #'P_basic'{ - content_type = stomp_frame:binary_header(Frame, "content-type", <<"text/plain">>), - content_encoding = stomp_frame:binary_header(Frame, "content-encoding", undefined), - headers = make_string_table(fun user_header_key/1, Headers), - delivery_mode = stomp_frame:integer_header(Frame, "delivery-mode", undefined), - priority = stomp_frame:integer_header(Frame, "priority", undefined), - correlation_id = stomp_frame:binary_header(Frame, "correlation-id", undefined), - reply_to = stomp_frame:binary_header(Frame, "reply-to", undefined), - message_id = stomp_frame:binary_header(Frame, "amqp-message-id", undefined) - }, - Method = #'basic.publish'{exchange = list_to_binary(ExchangeStr), - routing_key = list_to_binary(RoutingKeyStr), - mandatory = false, - immediate = false}, - case transactional(Frame) of - {yes, Transaction} -> - extend_transaction(Transaction, {Method, Props, Body}, State); - no -> - {ok, send_method(Method, Props, Body, State)} - end; - not_found -> - {ok, send_error("Missing destination", - "SEND must include a 'destination', and optional 'exchange' header\n", - State)} + {ok, RoutingKeyStr} -> + ExchangeStr = stomp_frame:header(Frame, "exchange", ""), + Props = #'P_basic'{ + content_type = stomp_frame:binary_header(Frame, "content-type", <<"text/plain">>), + content_encoding = stomp_frame:binary_header(Frame, "content-encoding", undefined), + headers = make_string_table(fun user_header_key/1, Headers), + delivery_mode = stomp_frame:integer_header(Frame, "delivery-mode", undefined), + priority = stomp_frame:integer_header(Frame, "priority", undefined), + correlation_id = stomp_frame:binary_header(Frame, "correlation-id", undefined), + reply_to = stomp_frame:binary_header(Frame, "reply-to", undefined), + message_id = stomp_frame:binary_header(Frame, "amqp-message-id", undefined) + }, + Method = #'basic.publish'{exchange = list_to_binary(ExchangeStr), + routing_key = list_to_binary(RoutingKeyStr), + mandatory = false, + immediate = false}, + case transactional(Frame) of + {yes, Transaction} -> + extend_transaction(Transaction, {Method, Props, Body}, State); + no -> + {ok, send_method(Method, Props, Body, State)} + end; + not_found -> + {ok, send_error("Missing destination", + "SEND must include a 'destination', and optional 'exchange' header\n", + State)} end; process_command("ACK", Frame, State = #state{session_id = SessionId}) -> case stomp_frame:header(Frame, "message-id") of - {ok, IdStr} -> - IdPrefix = SessionId ++ "_", - case string:substr(IdStr, 1, length(IdPrefix)) of - IdPrefix -> - DeliveryTag = list_to_integer(string:substr(IdStr, length(IdPrefix) + 1)), - Method = #'basic.ack'{delivery_tag = DeliveryTag, - multiple = false}, - case transactional(Frame) of - {yes, Transaction} -> - extend_transaction(Transaction, {Method}, State); - no -> - {ok, send_method(Method, State)} - end; - _ -> - rabbit_misc:die(command_invalid, 'basic.ack') - end; - not_found -> - {ok, send_error("Missing message-id", - "ACK must include a 'message-id' header\n", - State)} + {ok, IdStr} -> + IdPrefix = SessionId ++ "_", + case string:substr(IdStr, 1, length(IdPrefix)) of + IdPrefix -> + DeliveryTag = list_to_integer(string:substr(IdStr, length(IdPrefix) + 1)), + Method = #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}, + case transactional(Frame) of + {yes, Transaction} -> + extend_transaction(Transaction, {Method}, State); + no -> + {ok, send_method(Method, State)} + end; + _ -> + rabbit_misc:die(command_invalid, 'basic.ack') + end; + not_found -> + {ok, send_error("Missing message-id", + "ACK must include a 'message-id' header\n", + State)} end; process_command("COMMIT", Frame, State) -> transactional_action(Frame, "COMMIT", fun commit_transaction/2, State); process_command("ABORT", Frame, State) -> transactional_action(Frame, "ABORT", fun abort_transaction/2, State); process_command("SUBSCRIBE", - Frame = #stomp_frame{headers = Headers}, - State) -> + Frame = #stomp_frame{headers = Headers}, + State) -> AckMode = case stomp_frame:header(Frame, "ack", "auto") of - "auto" -> auto; - "client" -> client - end, + "auto" -> auto; + "client" -> client + end, case stomp_frame:header(Frame, "destination") of - {ok, QueueStr} -> - ConsumerTag = case stomp_frame:header(Frame, "id") of - {ok, Str} -> - list_to_binary("T_" ++ Str); - not_found -> - list_to_binary("Q_" ++ QueueStr) - end, - Queue = list_to_binary(QueueStr), - State1 = send_method(#'queue.declare'{queue = Queue, - passive = stomp_frame:boolean_header(Frame, "passive", false), - durable = stomp_frame:boolean_header(Frame, "durable", false), - exclusive = stomp_frame:boolean_header(Frame, "exclusive", false), - auto_delete = stomp_frame:boolean_header(Frame, "auto-delete", true), - nowait = true, - arguments = - make_string_table(fun user_queue_header_key/1, - Headers)}, - State), - State2 = case stomp_frame:header(Frame, "exchange") of - {ok, ExchangeStr } -> - Exchange = list_to_binary(ExchangeStr), - RoutingKeyStr = stomp_frame:header(Frame, "routing_key", ""), - RoutingKey = list_to_binary(RoutingKeyStr), - send_method(#'queue.bind'{queue = Queue, - exchange = Exchange, - routing_key = RoutingKey, - nowait = true, - arguments = - make_string_table( - fun user_binding_header_key/1, - Headers)}, - State1); - not_found -> State1 - end, - State3 = send_method(#'basic.consume'{queue = Queue, - consumer_tag = ConsumerTag, - no_local = false, - no_ack = (AckMode == auto), - exclusive = false, - nowait = true}, - State2), - {ok, State3}; - not_found -> - {ok, send_error("Missing destination", - "SUBSCRIBE must include a 'destination' header\n", - State)} + {ok, QueueStr} -> + ConsumerTag = case stomp_frame:header(Frame, "id") of + {ok, Str} -> + list_to_binary("T_" ++ Str); + not_found -> + list_to_binary("Q_" ++ QueueStr) + end, + Queue = list_to_binary(QueueStr), + State1 = send_method(#'queue.declare'{queue = Queue, + passive = stomp_frame:boolean_header(Frame, "passive", false), + durable = stomp_frame:boolean_header(Frame, "durable", false), + exclusive = stomp_frame:boolean_header(Frame, "exclusive", false), + auto_delete = stomp_frame:boolean_header(Frame, "auto-delete", true), + nowait = true, + arguments = + make_string_table(fun user_queue_header_key/1, + Headers)}, + State), + State2 = case stomp_frame:header(Frame, "exchange") of + {ok, ExchangeStr } -> + Exchange = list_to_binary(ExchangeStr), + RoutingKeyStr = stomp_frame:header(Frame, "routing_key", ""), + RoutingKey = list_to_binary(RoutingKeyStr), + send_method(#'queue.bind'{queue = Queue, + exchange = Exchange, + routing_key = RoutingKey, + nowait = true, + arguments = + make_string_table( + fun user_binding_header_key/1, + Headers)}, + State1); + not_found -> State1 + end, + State3 = send_method(#'basic.consume'{queue = Queue, + consumer_tag = ConsumerTag, + no_local = false, + no_ack = (AckMode == auto), + exclusive = false, + nowait = true}, + State2), + {ok, State3}; + not_found -> + {ok, send_error("Missing destination", + "SUBSCRIBE must include a 'destination' header\n", + State)} end; process_command("UNSUBSCRIBE", Frame, State) -> ConsumerTag = case stomp_frame:header(Frame, "id") of - {ok, IdStr} -> - list_to_binary("T_" ++ IdStr); - not_found -> - case stomp_frame:header(Frame, "destination") of - {ok, QueueStr} -> - list_to_binary("Q_" ++ QueueStr); - not_found -> - missing - end - end, + {ok, IdStr} -> + list_to_binary("T_" ++ IdStr); + not_found -> + case stomp_frame:header(Frame, "destination") of + {ok, QueueStr} -> + list_to_binary("Q_" ++ QueueStr); + not_found -> + missing + end + end, if - ConsumerTag == missing -> - {ok, send_error("Missing destination or id", - "UNSUBSCRIBE must include a 'destination' or 'id' header\n", - State)}; - true -> - {ok, send_method(#'basic.cancel'{consumer_tag = ConsumerTag, - nowait = true}, - State)} + ConsumerTag == missing -> + {ok, send_error("Missing destination or id", + "UNSUBSCRIBE must include a 'destination' or 'id' header\n", + State)}; + true -> + {ok, send_method(#'basic.cancel'{consumer_tag = ConsumerTag, + nowait = true}, + State)} end; process_command("DISCONNECT", _Frame, State) -> {ok, send_method(#'channel.close'{reply_code = 200, reply_text = <<"">>, - class_id = 0, method_id = 0}, State)}; + class_id = 0, method_id = 0}, State)}; process_command(Command, _Frame, State) -> {ok, send_error("Bad command", - "Could not interpret command " ++ Command ++ "\n", - State)}. + "Could not interpret command " ++ Command ++ "\n", + State)}. diff --git a/deps/rabbitmq_stomp/src/stomp_frame.erl b/deps/rabbitmq_stomp/src/stomp_frame.erl index 9c39c4749e..11a6b5c490 100644 --- a/deps/rabbitmq_stomp/src/stomp_frame.erl +++ b/deps/rabbitmq_stomp/src/stomp_frame.erl @@ -60,14 +60,14 @@ parse_headers([$\n | Rest], ParseState = #hstate{state = command, acc = []}) -> parse_headers(Rest, ParseState); parse_headers([$\n | Rest], ParseState = #hstate{state = command, acc = Acc}) -> parse_headers(Rest, ParseState#hstate{state = key, acc = [], - command = lists:reverse(Acc)}); + command = lists:reverse(Acc)}); parse_headers([$\n | Rest], _ParseState = #hstate{state = key, acc = Acc, - command = Command, headers = Headers}) -> + command = Command, headers = Headers}) -> case Acc of - [] -> - {ok, Command, Headers, Rest}; - _ -> - {error, {bad_header_key, lists:reverse(Acc)}} + [] -> + {ok, Command, Headers, Rest}; + _ -> + {error, {bad_header_key, lists:reverse(Acc)}} end; parse_headers([$: | Rest], ParseState = #hstate{state = key, acc = Acc}) -> parse_headers(Rest, ParseState#hstate{state = eatspace, acc = [], key = lists:reverse(Acc)}); @@ -76,82 +76,82 @@ parse_headers([$ | Rest], ParseState = #hstate{state = eatspace}) -> parse_headers(Input, ParseState = #hstate{state = eatspace}) -> parse_headers(Input, ParseState#hstate{state = value}); parse_headers([$\n | Rest], ParseState = #hstate{state = value, acc = Acc, key = Key, - headers = Headers}) -> + headers = Headers}) -> parse_headers(Rest, ParseState#hstate{state = key, acc = [], - headers = [{Key, lists:reverse(Acc)} - | Headers]}); + headers = [{Key, lists:reverse(Acc)} + | Headers]}); parse_headers([Ch | Rest], ParseState = #hstate{acc = Acc}) -> if - Ch < 32 -> - {error, {bad_character, Ch}}; - true -> - parse_headers(Rest, ParseState#hstate{acc = [Ch | Acc]}) + Ch < 32 -> + {error, {bad_character, Ch}}; + true -> + parse_headers(Rest, ParseState#hstate{acc = [Ch | Acc]}) end. header(#stomp_frame{headers = Headers}, Key) -> case lists:keysearch(Key, 1, Headers) of - {value, {_, Str}} -> - {ok, Str}; - _ -> - not_found + {value, {_, Str}} -> + {ok, Str}; + _ -> + not_found end. header(#stomp_frame{headers = Headers}, Key, DefaultValue) -> case lists:keysearch(Key, 1, Headers) of - {value, {_, Str}} -> - Str; - _ -> - DefaultValue + {value, {_, Str}} -> + Str; + _ -> + DefaultValue end. boolean_header(#stomp_frame{headers = Headers}, Key) -> boolean_header(Headers, Key); boolean_header(Headers, Key) -> case lists:keysearch(Key, 1, Headers) of - {value, {_, "true"}} -> - {ok, true}; - {value, {_, "false"}} -> - {ok, false}; - _ -> - not_found + {value, {_, "true"}} -> + {ok, true}; + {value, {_, "false"}} -> + {ok, false}; + _ -> + not_found end. boolean_header(H, Key, D) -> case boolean_header(H, Key) of - {ok, V} -> - V; - not_found -> - D + {ok, V} -> + V; + not_found -> + D end. integer_header(#stomp_frame{headers = Headers}, Key) -> integer_header(Headers, Key); integer_header(Headers, Key) -> case lists:keysearch(Key, 1, Headers) of - {value, {_, Str}} -> - {ok, list_to_integer(string:strip(Str))}; - _ -> - not_found + {value, {_, Str}} -> + {ok, list_to_integer(string:strip(Str))}; + _ -> + not_found end. integer_header(H, Key, D) -> case integer_header(H, Key) of - {ok, V} -> - V; - not_found -> - D + {ok, V} -> + V; + not_found -> + D end. binary_header(F, K) -> case header(F, K) of - {ok, Str} -> {ok, list_to_binary(Str)}; - not_found -> not_found + {ok, Str} -> {ok, list_to_binary(Str)}; + not_found -> not_found end. binary_header(F, K, V) -> case header(F, K) of - {ok, Str} -> list_to_binary(Str); - not_found -> V + {ok, Str} -> list_to_binary(Str); + not_found -> V end. content_length(Headers) -> @@ -159,10 +159,10 @@ content_length(Headers) -> initial_body_state(Headers) -> case content_length(Headers) of - {ok, ByteCount} -> - #bstate{acc = [], remaining = ByteCount}; - not_found -> - #bstate{acc = [], remaining = unknown} + {ok, ByteCount} -> + #bstate{acc = [], remaining = ByteCount}; + not_found -> + #bstate{acc = [], remaining = unknown} end. parse_body([], State) -> @@ -175,10 +175,10 @@ parse_body([Ch | Rest], State = #bstate{acc = Acc, remaining = unknown}) -> parse_body(Rest, State#bstate{acc = [Ch | Acc]}); parse_body([Ch | Rest], State = #bstate{acc = Acc, remaining = N}) -> if - N > 0 -> - parse_body(Rest, State#bstate{acc = [Ch | Acc], remaining = N - 1}); - true -> - {error, missing_body_terminator} + N > 0 -> + parse_body(Rest, State#bstate{acc = [Ch | Acc], remaining = N - 1}); + true -> + {error, missing_body_terminator} end. initial_state() -> @@ -186,36 +186,36 @@ initial_state() -> parse(Rest, {headers, HState}) -> case parse_headers(Rest, HState) of - {more, HState1} -> - {more, {headers, HState1}}; - {ok, Command, Headers, Rest1} -> - parse(Rest1, #stomp_frame{command = Command, - headers = Headers, - body = initial_body_state(Headers)}); - E = {error, _} -> - E + {more, HState1} -> + {more, {headers, HState1}}; + {ok, Command, Headers, Rest1} -> + parse(Rest1, #stomp_frame{command = Command, + headers = Headers, + body = initial_body_state(Headers)}); + E = {error, _} -> + E end; parse(Rest, Frame = #stomp_frame{body = BState}) -> case parse_body(Rest, BState) of - {more, BState1} -> - {more, Frame#stomp_frame{body = BState1}}; - {ok, Body, Rest1} -> - {ok, Frame#stomp_frame{body = Body}, Rest1}; - E = {error, _} -> - E + {more, BState1} -> + {more, Frame#stomp_frame{body = BState1}}; + {ok, Body, Rest1} -> + {ok, Frame#stomp_frame{body = Body}, Rest1}; + E = {error, _} -> + E end. serialize(#stomp_frame{command = Command, - headers = Headers, - body = Body}) -> + headers = Headers, + body = Body}) -> Len = length(Body), [Command, $\n, lists:map(fun serialize_header/1, lists:keydelete("content-length", 1, Headers)), if - Len > 0 -> - ["content-length:", integer_to_list(length(Body)), $\n]; - true -> - [] + Len > 0 -> + ["content-length:", integer_to_list(length(Body)), $\n]; + true -> + [] end, $\n, Body,