Merge default and resolve clashes.

This commit is contained in:
Steve Powell 2012-08-15 17:22:15 +01:00
commit 769254ae61
7 changed files with 210 additions and 101 deletions

View File

@ -18,6 +18,7 @@
-define(HEADER_ACK, "ack").
-define(HEADER_AMQP_MESSAGE_ID, "amqp-message-id").
-define(HEADER_CONTENT_ENCODING, "content-encoding").
-define(HEADER_CONTENT_LENGTH, "content-length").
-define(HEADER_CONTENT_TYPE, "content-type").
-define(HEADER_CORRELATION_ID, "correlation-id").
-define(HEADER_DESTINATION, "destination").

View File

@ -20,6 +20,7 @@
-module(rabbit_stomp_frame).
-include("rabbit_stomp_frame.hrl").
-include("rabbit_stomp_headers.hrl").
-export([parse/2, initial_state/0]).
-export([header/2, header/3,
@ -30,69 +31,130 @@
initial_state() -> none.
parse(Content, {resume, Fun}) -> Fun(Content);
parse(Content, none) -> parse_command(Content, []).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% STOMP 1.1 frames basic syntax
%% Rabbit modifications:
%% o CR LF is equivalent to LF in all element terminators (eol).
%% o Escape codes for header names and values include \r for CR
%% and CR is not allowed.
%% o Header names and values are not limited to UTF-8 strings.
%%
%% frame_seq ::= *(noise frame)
%% noise ::= *(NUL | eol)
%% eol ::= LF | CR LF
%% frame ::= cmd hdrs body NUL
%% body ::= *OCTET
%% cmd ::= 1*NOTEOL eol
%% hdrs ::= *hdr eol
%% hdr ::= hdrname COLON hdrvalue eol
%% hdrname ::= 1*esc_char
%% hdrvalue ::= *esc_char
%% esc_char ::= HDROCT | BACKSLASH ESCCODE
%%
%% Terms in CAPS all represent sets (alternatives) of single octets.
%% They are defined here using a small extension of BNF, minus (-):
%%
%% term1 - term2 denotes any of the possibilities in term1
%% excluding those in term2.
%% In this grammar minus is only used for sets of single octets.
%%
%% OCTET ::= '00'x..'FF'x % any octet
%% NUL ::= '00'x % the zero octet
%% LF ::= '\n' % '0a'x newline or linefeed
%% CR ::= '\r' % '0d'x carriage return
%% NOTEOL ::= OCTET - (CR | LF) % any octet except CR or LF
%% BACKSLASH ::= '\\' % '5c'x
%% ESCCODE ::= 'c' | 'n' | 'r' | BACKSLASH
%% COLON ::= ':'
%% HDROCT ::= NOTEOL - (COLON | BACKSLASH)
%% % octets allowed in a header
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
parse_command(<<>>, Acc) ->
more(fun(Rest) -> parse_command(Rest, Acc) end);
parse_command(<<$\n, Rest/binary>>, []) -> % inter-frame newline
parse_command(Rest, []);
parse_command(<<0, Rest/binary>>, []) -> % empty frame
parse_command(Rest, []);
parse_command(<<$\n, Rest/binary>>, Acc) -> % end command
parse_headers(Rest, lists:reverse(Acc));
parse_command(<<Ch:8, Rest/binary>>, Acc) ->
parse_command(Rest, [Ch | Acc]).
%% explicit frame characters
-define(NUL, 0).
-define(CR, $\r).
-define(LF, $\n).
-define(BSL, $\\).
-define(COLON, $:).
parse_headers(Rest, Command) -> % begin headers
parse_headers(Rest, #stomp_frame{command = Command}, [], []).
%% header escape codes
-define(LF_ESC, $n).
-define(BSL_ESC, $\\).
-define(COLON_ESC, $c).
-define(CR_ESC, $r).
parse_headers(<<>>, Frame, HeaderAcc, KeyAcc) ->
more(fun(Rest) -> parse_headers(Rest, Frame, HeaderAcc, KeyAcc) end);
parse_headers(<<$\n, Rest/binary>>, Frame, HeaderAcc, _KeyAcc) -> % end headers
parse_body(Rest, Frame#stomp_frame{headers = HeaderAcc});
parse_headers(<<$:, Rest/binary>>, Frame, HeaderAcc, KeyAcc) -> % end key
parse_header_value(Rest, Frame, HeaderAcc, lists:reverse(KeyAcc));
parse_headers(<<Ch:8, Rest/binary>>, Frame, HeaderAcc, KeyAcc) ->
parse_headers(Rest, Frame, HeaderAcc, [Ch | KeyAcc]).
%% parser state
-record(state, {acc, cmd, hdrs, hdrname}).
parse_header_value(Rest, Frame, HeaderAcc, Key) -> % begin header value
parse_header_value(Rest, Frame, HeaderAcc, Key, []).
parse(Content, {resume, Continuation}) -> Continuation(Content);
parse(Content, none ) -> parser(Content, noframe, #state{}).
parse_header_value(<<>>, Frame, HeaderAcc, Key, ValAcc) ->
more(fun(Rest) -> parse_header_value(Rest, Frame, HeaderAcc, Key, ValAcc)
end);
parse_header_value(<<$\n, Rest/binary>>, Frame, HeaderAcc, Key, ValAcc) ->
% end value
parse_headers(Rest, Frame,
insert_header(HeaderAcc, Key, lists:reverse(ValAcc)),
[]);
parse_header_value(<<$\\, Rest/binary>>, Frame, HeaderAcc, Key, ValAcc) ->
parse_header_value_escape(Rest, Frame, HeaderAcc, Key, ValAcc);
parse_header_value(<<Ch:8, Rest/binary>>, Frame, HeaderAcc, Key, ValAcc) ->
parse_header_value(Rest, Frame, HeaderAcc, Key, [Ch | ValAcc]).
more(Continuation) -> {more, {resume, Continuation}}.
parse_header_value_escape(<<>>, Frame, HeaderAcc, Key, ValAcc) ->
more(fun(Rest) ->
parse_header_value_escape(Rest, Frame, HeaderAcc, Key, ValAcc)
end);
parse_header_value_escape(<<Ch:8, Rest/binary>>, Frame,
HeaderAcc, Key, ValAcc) ->
case unescape(Ch) of
{ok, EscCh} -> parse_header_value(Rest, Frame, HeaderAcc, Key,
[EscCh | ValAcc]);
error -> {error, {bad_escape, Ch}}
end.
%% Single-function parser: Term :: noframe | command | headers | hdrname | hdrvalue
%% general more and line-end detection
parser(<<>>, Term , State) -> more(fun(Rest) -> parser(Rest, Term, State) end);
parser(<<?CR>>, Term , State) -> more(fun(Rest) -> parser(<<?CR, Rest/binary>>, Term, State) end);
parser(<<?CR, ?LF, Rest/binary>>, Term , State) -> parser(<<?LF, Rest/binary>>, Term, State);
parser(<<?CR, Ch:8, _Rest/binary>>, Term , _State) -> {error, {unexpected_chars(Term), [?CR, Ch]}};
%% escape processing (only in hdrname and hdrvalue terms)
parser(<<?BSL>>, Term , State) -> more(fun(Rest) -> parser(<<?BSL, Rest/binary>>, Term, State) end);
parser(<<?BSL, Ch:8, Rest/binary>>, Term , State)
when Term == hdrname;
Term == hdrvalue -> unescape(Ch, fun(Ech) -> parser(Rest, Term, accum(Ech, State)) end);
%% inter-frame noise
parser(<<?NUL, Rest/binary>>, noframe , State) -> parser(Rest, noframe, State);
parser(<<?LF, Rest/binary>>, noframe , State) -> parser(Rest, noframe, State);
%% detect transitions
parser( Rest, noframe , State) -> goto(noframe, command, Rest, State);
parser(<<?LF, Rest/binary>>, command , State) -> goto(command, headers, Rest, State);
parser(<<?LF, Rest/binary>>, headers , State) -> goto(headers, body, Rest, State);
parser( Rest, headers , State) -> goto(headers, hdrname, Rest, State);
parser(<<?COLON, Rest/binary>>, hdrname , State) -> goto(hdrname, hdrvalue, Rest, State);
parser(<<?LF, Rest/binary>>, hdrname , State) -> goto(hdrname, headers, Rest, State);
parser(<<?LF, Rest/binary>>, hdrvalue, State) -> goto(hdrvalue, headers, Rest, State);
%% trap invalid colons
parser(<<?COLON, Rest/binary>>, hdrvalue, State) -> {error, {unexpected_char_in_header_value, [?COLON]}};
%% accumulate
parser(<<Ch:8, Rest/binary>>, Term , State) -> parser(Rest, Term, accum(Ch, State)).
insert_header(Headers, Key, Value) ->
case lists:keysearch(Key, 1, Headers) of
{value, _} -> Headers; % first header only
false -> [{Key, Value} | Headers]
%% state transitions
goto(noframe, command, Rest, State ) -> parser(Rest, command, State#state{acc = []});
goto(command, headers, Rest, State = #state{acc = Acc} ) -> parser(Rest, headers, State#state{cmd = lists:reverse(Acc), hdrs = []});
goto(headers, body, Rest, #state{cmd = Cmd, hdrs = Hdrs}) -> parse_body(Rest, #stomp_frame{command = Cmd, headers = Hdrs});
goto(headers, hdrname, Rest, State ) -> parser(Rest, hdrname, State#state{acc = []});
goto(hdrname, hdrvalue, Rest, State = #state{acc = Acc} ) -> parser(Rest, hdrvalue, State#state{acc = [], hdrname = lists:reverse(Acc)});
goto(hdrname, headers, _Rest, #state{acc = Acc} ) -> {error, {header_no_value, lists:reverse(Acc)}}; % badly formed header -- fatal error
goto(hdrvalue, headers, Rest, State = #state{acc = Acc, hdrs = Headers, hdrname = HdrName}) ->
parser(Rest, headers, State#state{hdrs = insert_header(Headers, HdrName, lists:reverse(Acc))}).
%% error atom
unexpected_chars(noframe) -> unexpected_chars_between_frames;
unexpected_chars(command) -> unexpected_chars_in_command;
unexpected_chars(hdrname) -> unexpected_chars_in_header;
unexpected_chars(hdrvalue) -> unexpected_chars_in_header;
unexpected_chars(_Term) -> unexpected_chars.
%% general accumulation
accum(Ch, State = #state{acc = Acc}) -> State#state{acc = [Ch | Acc]}.
%% resolve escapes (with error processing)
unescape(?LF_ESC, Fun) -> Fun(?LF);
unescape(?BSL_ESC, Fun) -> Fun(?BSL);
unescape(?COLON_ESC, Fun) -> Fun(?COLON);
unescape(?CR_ESC, Fun) -> Fun(?CR);
unescape(Ch, _Fun) -> {error, {bad_escape, [?BSL, Ch]}}.
%% insert header unless aleady seen
insert_header(Headers, Name, Value) ->
case lists:keymember(Name, 1, Headers) of
true -> Headers; % first header only
false -> [{Name, Value} | Headers]
end.
parse_body(Content, Frame) ->
parse_body(Content, Frame, [],
integer_header(Frame, "content-length", unknown)).
integer_header(Frame, ?HEADER_CONTENT_LENGTH, unknown)).
parse_body(Content, Frame, Chunks, unknown) ->
parse_body2(Content, Frame, Chunks, case firstnull(Content) of
@ -117,8 +179,6 @@ parse_body2(Content, Frame, Chunks, {done, Pos}) ->
finalize_chunk(<<>>, Chunks) -> Chunks;
finalize_chunk(Chunk, Chunks) -> [Chunk | Chunks].
more(Continuation) -> {more, {resume, Continuation}}.
default_value({ok, Value}, _DefaultValue) -> Value;
default_value(not_found, DefaultValue) -> DefaultValue.
@ -162,27 +222,27 @@ serialize(#stomp_frame{command = Command,
headers = Headers,
body_iolist = BodyFragments}) ->
Len = iolist_size(BodyFragments),
[Command, $\n,
[Command, ?LF,
lists:map(fun serialize_header/1,
lists:keydelete("content-length", 1, Headers)),
lists:keydelete(?HEADER_CONTENT_LENGTH, 1, Headers)),
if
Len > 0 -> ["content-length:", integer_to_list(Len), $\n];
Len > 0 -> [?HEADER_CONTENT_LENGTH ++ ":", integer_to_list(Len), ?LF];
true -> []
end,
$\n, BodyFragments, 0].
?LF, BodyFragments, 0].
serialize_header({K, V}) when is_integer(V) -> [K, $:, integer_to_list(V), $\n];
serialize_header({K, V}) when is_list(V) -> [K, $:, [escape(C) || C <- V], $\n].
serialize_header({K, V}) when is_integer(V) -> hdr(escape(K), integer_to_list(V));
serialize_header({K, V}) when is_list(V) -> hdr(escape(K), escape(V)).
unescape($n) -> {ok, $\n};
unescape($\\) -> {ok, $\\};
unescape($c) -> {ok, $:};
unescape(_) -> error.
hdr(K, V) -> [K, ?COLON, V, ?LF].
escape($:) -> "\\c";
escape($\\) -> "\\\\";
escape($\n) -> "\\n";
escape(C) -> C.
escape(Str) -> [escape1(Ch) || Ch <- Str].
escape1(?COLON) -> [?BSL, ?COLON_ESC];
escape1(?BSL) -> [?BSL, ?BSL_ESC];
escape1(?LF) -> [?BSL, ?LF_ESC];
escape1(?CR) -> [?BSL, ?CR_ESC];
escape1(Ch) -> Ch.
firstnull(Content) -> firstnull(Content, 0).

View File

@ -196,13 +196,13 @@ process_connect(Implicit, Frame,
{Username, Creds} = creds(Frame1, SSLLoginName, Config),
{ok, DefaultVHost} =
application:get_env(rabbit, default_vhost),
{ProtoName, _} = AdapterInfo#adapter_info.protocol,
{ProtoName, _} = AdapterInfo#amqp_adapter_info.protocol,
Res = do_login(
Username, Creds,
login_header(Frame1, ?HEADER_HOST, DefaultVHost),
login_header(Frame1, ?HEADER_HEART_BEAT, "0,0"),
AdapterInfo#adapter_info{
protocol = {ProtoName, Version}}, Version,
AdapterInfo#amqp_adapter_info{
protocol = {ProtoName, Version}}, Version,
StateN#state{frame_transformer = FT}),
case {Res, Implicit} of
{{ok, _, StateN1}, implicit} -> ok(StateN1);
@ -500,21 +500,22 @@ do_login(Username, Creds, VirtualHost, Heartbeat, AdapterInfo, Version,
{error, auth_failure} ->
rabbit_log:error("STOMP login failed - auth_failure "
"(user vanished)~n"),
error("Bad CONNECT", "Authentication failure", State);
error("Bad CONNECT", "User failure after authentication", State);
{error, access_refused} ->
rabbit_log:warning("STOMP login failed - access_refused "
"(vhost access not allowed)~n"),
error("Bad CONNECT", "Authentication failure", State)
error("Bad CONNECT", "Virtual host '" ++
binary_to_list(VirtualHost) ++
"' access denied", State)
end;
{refused, Msg, Args} ->
rabbit_log:warning("STOMP login failed: " ++ Msg ++ "~n", Args),
error("Bad CONNECT", "Authentication failure", State)
error("Bad CONNECT", "Access refused: " ++ Msg ++ "~n", Args, State)
end.
server_header() ->
Props = rabbit_reader:server_properties(?PROTOCOL),
{_, Product} = rabbit_misc:table_lookup(Props, <<"product">>),
{_, Version} = rabbit_misc:table_lookup(Props, <<"version">>),
{ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn),
rabbit_misc:format("~s/~s", [Product, Version]).
do_subscribe(Destination, DestHdr, Frame,
@ -962,7 +963,7 @@ ok(Command, Headers, BodyFragments, State) ->
body_iolist = BodyFragments}, State}.
amqp_death(ReplyCode, Explanation, State) ->
ErrorName = ?PROTOCOL:amqp_exception(ReplyCode),
ErrorName = amqp_connection:error_atom(ReplyCode),
ErrorDesc = rabbit_misc:format("~s~n", [Explanation]),
log_error(ErrorName, ErrorDesc, none),
{stop, normal, send_error(atom_to_list(ErrorName), ErrorDesc, State)}.

View File

@ -174,13 +174,13 @@ adapter_info(Sock) ->
{ok, Res3} -> Res3;
_ -> unknown
end,
#adapter_info{protocol = {'STOMP', 0},
name = list_to_binary(Name),
address = Addr,
port = Port,
peer_address = PeerAddr,
peer_port = PeerPort,
additional_info = maybe_ssl_info(Sock)}.
#amqp_adapter_info{protocol = {'STOMP', 0},
name = list_to_binary(Name),
address = Addr,
port = Port,
peer_address = PeerAddr,
peer_port = PeerPort,
additional_info = maybe_ssl_info(Sock)}.
maybe_ssl_info(Sock) ->
case rabbit_net:is_ssl(Sock) of

View File

@ -80,20 +80,20 @@ class TestLifecycle(base.BaseTest):
''' Test bad username'''
self.bad_connect(stomp.Connection(user="gust",
passcode="guest"),
"Authentication failure\n")
"Access refused: user 'gust' - invalid credentials\n")
def test_bad_password(self):
''' Test bad password'''
self.bad_connect(stomp.Connection(user="guest",
passcode="gust"),
"Authentication failure\n")
"Access refused: user 'guest' - invalid credentials\n")
def test_bad_vhost(self):
''' Test bad virtual host'''
self.bad_connect(stomp.Connection(user="guest",
passcode="guest",
virtual_host="//"),
"Authentication failure\n")
"Virtual host '//' access denied\n")
def bad_connect(self, new_conn, expected):
self.conn.disconnect()

View File

@ -33,7 +33,7 @@ test_messages_not_dropped_on_disconnect() ->
[integer_to_list(Count)]) || Count <- lists:seq(1, 1000)],
rabbit_stomp_client:disconnect(Client),
QName = rabbit_misc:r(<<"/">>, queue, <<"bulk-test">>),
timer:sleep(1000),
timer:sleep(3000),
rabbit_amqqueue:with(
QName, fun(Q) ->
1000 = pget(messages, rabbit_amqqueue:info(Q, [messages]))

View File

@ -19,12 +19,20 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_stomp_frame.hrl").
-include("rabbit_stomp_headers.hrl").
parse_simple_frame_test() ->
parse_simple_frame_gen("\n").
parse_simple_frame_crlf_test() ->
parse_simple_frame_gen("\r\n").
parse_simple_frame_gen(Term) ->
Headers = [{"header1", "value1"}, {"header2", "value2"}],
Content = frame_string("COMMAND",
Headers,
"Body Content"),
"Body Content",
Term),
{"COMMAND", Frame, _State} = parse_complete(Content),
[?assertEqual({ok, Value},
rabbit_stomp_frame:header(Frame, Key)) ||
@ -34,7 +42,7 @@ parse_simple_frame_test() ->
parse_simple_frame_with_null_test() ->
Headers = [{"header1", "value1"}, {"header2", "value2"},
{"content-length", "12"}],
{?HEADER_CONTENT_LENGTH, "12"}],
Content = frame_string("COMMAND",
Headers,
"Body\0Content"),
@ -48,7 +56,7 @@ parse_simple_frame_with_null_test() ->
parse_large_content_frame_with_nulls_test() ->
BodyContent = string:copies("012345678\0", 1024),
Headers = [{"header1", "value1"}, {"header2", "value2"},
{"content-length", integer_to_list(string:len(BodyContent))}],
{?HEADER_CONTENT_LENGTH, integer_to_list(string:len(BodyContent))}],
Content = frame_string("COMMAND",
Headers,
BodyContent),
@ -68,11 +76,17 @@ parse_ignore_empty_frames_test() ->
parse_heartbeat_interframe_test() ->
{ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("\nCOMMAND\n\n\0").
parse_crlf_interframe_test() ->
{ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("\r\nCOMMAND\n\n\0").
parse_carriage_return_not_ignored_interframe_test() ->
{ok, #stomp_frame{command = "\rCOMMAND"}, _Rest} = parse("\rCOMMAND\n\n\0").
{error, {unexpected_chars_between_frames, "\rC"}} = parse("\rCOMMAND\n\n\0").
parse_carriage_return_mid_command_test() ->
{ok, #stomp_frame{command = "COMM\rAND"}, _Rest} = parse("COMM\rAND\n\n\0").
{error, {unexpected_chars_in_command, "\rA"}} = parse("COMM\rAND\n\n\0").
parse_carriage_return_end_command_test() ->
{error, {unexpected_chars_in_command, "\r\r"}} = parse("COMMAND\r\r\n\n\0").
parse_resume_mid_command_test() ->
First = "COMM",
@ -118,11 +132,41 @@ parse_multiple_headers_test() ->
{ok, Val} = rabbit_stomp_frame:header(Frame, "header"),
?assertEqual("correct", Val).
headers_escaping_roundtrip_test() ->
Content = "COMMAND\nheader:\\c\\n\\\\\n\n\0",
header_no_colon_test() ->
Content = "COMMAND\n"
"hdr1:val1\n"
"hdrerror\n"
"hdr2:val2\n"
"\n\0",
?assertEqual(parse(Content), {error, {header_no_value, "hdrerror"}}).
no_nested_escapes_test() ->
Content = "COM\\\\rAND\n" % no escapes
"hdr\\\\rname:" % one escape
"hdr\\\\rval\n\n\0", % one escape
{ok, Frame, _} = parse(Content),
{ok, Val} = rabbit_stomp_frame:header(Frame, "header"),
?assertEqual(":\n\\", Val),
?assertEqual(Frame,
#stomp_frame{command = "COM\\\\rAND",
headers = [{"hdr\\rname", "hdr\\rval"}],
body_iolist = []}).
header_name_with_cr_test() ->
Content = "COMMAND\nhead\rer:val\n\n\0",
{error, {unexpected_chars_in_header, "\re"}} = parse(Content).
header_value_with_cr_test() ->
Content = "COMMAND\nheader:val\rue\n\n\0",
{error, {unexpected_chars_in_header, "\ru"}} = parse(Content).
header_value_with_colon_test() ->
Content = "COMMAND\nheader:val:ue\n\n\0",
{error, {unexpected_char_in_header_value, ":"}} = parse(Content).
headers_escaping_roundtrip_test() ->
Content = "COMMAND\nhead\\r\\c\\ner:\\c\\n\\r\\\\\n\n\0",
{ok, Frame, _} = parse(Content),
{ok, Val} = rabbit_stomp_frame:header(Frame, "head\r:\ner"),
?assertEqual(":\n\r\\", Val),
Serialized = lists:flatten(rabbit_stomp_frame:serialize(Frame)),
?assertEqual(Content, rabbit_misc:format("~s", [Serialized])).
@ -136,7 +180,10 @@ parse_complete(Content) ->
{Command, Frame, State}.
frame_string(Command, Headers, BodyContent) ->
HeaderString =
lists:flatten([Key ++ ":" ++ Value ++ "\n" || {Key, Value} <- Headers]),
Command ++ "\n" ++ HeaderString ++ "\n" ++ BodyContent ++ "\0".
frame_string(Command, Headers, BodyContent, "\n").
frame_string(Command, Headers, BodyContent, Term) ->
HeaderString =
lists:flatten([Key ++ ":" ++ Value ++ Term || {Key, Value} <- Headers]),
Command ++ Term ++ HeaderString ++ Term ++ BodyContent ++ "\0".