From 861d2e8f2a96d04c16b1d171ee331044e357f7f4 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Wed, 5 Aug 2009 14:17:34 +0100 Subject: [PATCH] Introduced amqp_msg record to be friendlier to higher level code --- deps/amqp_client/include/amqp_client.hrl | 2 ++ deps/amqp_client/rabbit_common.app | 2 +- deps/amqp_client/src/amqp_channel.erl | 14 ++++++++++---- deps/amqp_client/src/amqp_rpc_client.erl | 6 ++---- deps/amqp_client/src/amqp_rpc_server.erl | 5 ++--- deps/amqp_client/src/lib_amqp.erl | 6 +----- deps/amqp_client/test/test_util.erl | 17 +++++++++-------- 7 files changed, 27 insertions(+), 25 deletions(-) diff --git a/deps/amqp_client/include/amqp_client.hrl b/deps/amqp_client/include/amqp_client.hrl index 0dc2eaed98..012d8eec91 100644 --- a/deps/amqp_client/include/amqp_client.hrl +++ b/deps/amqp_client/include/amqp_client.hrl @@ -23,6 +23,8 @@ %% Contributor(s): Ben Hood <0x6e6562@gmail.com>. %% +-record(amqp_msg, {props, payload}). + -record(connection_state, {username, password, serverhost, diff --git a/deps/amqp_client/rabbit_common.app b/deps/amqp_client/rabbit_common.app index a363d37bdf..57ce966b66 100644 --- a/deps/amqp_client/rabbit_common.app +++ b/deps/amqp_client/rabbit_common.app @@ -6,7 +6,7 @@ rabbit_reader, rabbit_framing, rabbit_framing_channel, - rabbit_binary_parser, + rabbit_basic, rabbit_binary_generator, rabbit_channel, rabbit_misc, diff --git a/deps/amqp_client/src/amqp_channel.erl b/deps/amqp_client/src/amqp_channel.erl index 0423118f39..8d807eaf42 100644 --- a/deps/amqp_client/src/amqp_channel.erl +++ b/deps/amqp_client/src/amqp_channel.erl @@ -198,6 +198,10 @@ return_handler(State = #channel_state{return_handler_pid = undefined}) -> return_handler(State = #channel_state{return_handler_pid = ReturnHandler}) -> {ReturnHandler, State}. +amqp_msg(Content) -> + {Props, Payload} = rabbit_basic:from_content(Content), + #amqp_msg{props = Props, payload = Payload}. + handle_method(ConsumeOk = #'basic.consume_ok'{consumer_tag = ConsumerTag}, State = #channel_state{anon_sub_requests = Anon, tagged_sub_requests = Tagged}) -> @@ -250,7 +254,7 @@ handle_method(Method, State) -> handle_method(Deliver = #'basic.deliver'{consumer_tag = ConsumerTag}, Content, State) -> Consumer = resolve_consumer(ConsumerTag, State), - Consumer ! {Deliver, Content}, + Consumer ! {Deliver, amqp_msg(Content)}, {noreply, State}; %% Why is the consumer a handle_method/3 call with the network driver, @@ -260,11 +264,11 @@ handle_method('basic.consume_ok', ConsumerTag, State) -> handle_method(BasicReturn = #'basic.return'{}, Content, State) -> {ReturnHandler, NewState} = return_handler(State), - ReturnHandler ! {BasicReturn, Content}, + ReturnHandler ! {BasicReturn, amqp_msg(Content)}, {noreply, NewState}; handle_method(Method, Content, State) -> - rpc_bottom_half( {Method, Content} , State). + rpc_bottom_half( {Method, amqp_msg(Content)} , State). %%--------------------------------------------------------------------------- %% gen_server callbacks @@ -288,8 +292,10 @@ handle_call({call, _Method, _Content}, _From, State = #channel_state{closing = true}) -> {reply, blocked, State}; -handle_call({call, Method, Content}, _From, +handle_call({call, Method, #amqp_msg{props = Props, + payload = Payload}}, _From, State = #channel_state{writer_pid = Writer, do3 = Do3}) -> + Content = rabbit_basic:build_content(Props, Payload), Do3(Writer, Method, Content), {reply, ok, State}; diff --git a/deps/amqp_client/src/amqp_rpc_client.erl b/deps/amqp_client/src/amqp_rpc_client.erl index a0cac71d48..e8afe88d9a 100644 --- a/deps/amqp_client/src/amqp_rpc_client.erl +++ b/deps/amqp_client/src/amqp_rpc_client.erl @@ -119,11 +119,9 @@ handle_info(#'basic.cancel_ok'{}, State) -> {stop, normal, State}; handle_info({#'basic.deliver'{}, - {content, ClassId, _Props, PropertiesBin, [Payload] }}, + #amqp_msg{props = #'P_basic'{correlation_id = <>}, + payload = Payload}}, State = #rpc_client_state{continuations = Conts}) -> - #'P_basic'{correlation_id = CorrelationId} - = rabbit_framing:decode_properties(ClassId, PropertiesBin), - <> = CorrelationId, From = dict:fetch(Id, Conts), gen_server:reply(From, Payload), {noreply, State#rpc_client_state{continuations = dict:erase(Id, Conts) }}. diff --git a/deps/amqp_client/src/amqp_rpc_server.erl b/deps/amqp_client/src/amqp_rpc_server.erl index 9d0e61407e..9afc847178 100644 --- a/deps/amqp_client/src/amqp_rpc_server.erl +++ b/deps/amqp_client/src/amqp_rpc_server.erl @@ -68,11 +68,10 @@ handle_info(#'basic.cancel_ok'{}, State) -> {stop, normal, State}; handle_info({#'basic.deliver'{}, - {content, ClassId, _Props, PropertiesBin, [Payload] }}, + #amqp_msg{props = Props, payload = Payload}}, State = #rpc_server_state{handler = Fun, channel = Channel}) -> #'P_basic'{correlation_id = CorrelationId, - reply_to = Q} = - rabbit_framing:decode_properties(ClassId, PropertiesBin), + reply_to = Q} = Props, Response = Fun(Payload), Properties = #'P_basic'{correlation_id = CorrelationId}, lib_amqp:publish(Channel, <<>>, Q, Response, Properties), diff --git a/deps/amqp_client/src/lib_amqp.erl b/deps/amqp_client/src/lib_amqp.erl index 3cd7aa138b..9a4c8c4c27 100644 --- a/deps/amqp_client/src/lib_amqp.erl +++ b/deps/amqp_client/src/lib_amqp.erl @@ -85,11 +85,7 @@ publish_internal(Fun, Channel, X, RoutingKey, BasicPublish = #'basic.publish'{exchange = X, routing_key = RoutingKey, mandatory = Mandatory}, - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), - Content = #content{class_id = ClassId, - properties = Properties, - properties_bin = none, - payload_fragments_rev = [Payload]}, + Content = #amqp_msg{props = Properties, payload = Payload}, Fun(Channel, BasicPublish, Content). %%--------------------------------------------------------------------------- diff --git a/deps/amqp_client/test/test_util.erl b/deps/amqp_client/test/test_util.erl index 161a565820..c71c541bff 100644 --- a/deps/amqp_client/test/test_util.erl +++ b/deps/amqp_client/test/test_util.erl @@ -126,8 +126,8 @@ get_and_assert_empty(Channel, Q) -> get_and_assert_equals(Channel, Q, Payload) -> Content = lib_amqp:get(Channel, Q), - #content{payload_fragments_rev = PayloadFragments} = Content, - ?assertMatch([Payload], PayloadFragments). + #amqp_msg{payload = Payload2} = Content, + ?assertMatch(Payload, Payload2). basic_get_test(Connection) -> Channel = lib_amqp:start_channel(Connection), @@ -135,8 +135,8 @@ basic_get_test(Connection) -> %% TODO: This could be refactored to use get_and_assert_equals, %% get_and_assert_empty .... would require another bug though :-) Content = lib_amqp:get(Channel, Q), - #content{payload_fragments_rev = PayloadFragments} = Content, - ?assertMatch([<<"foobar">>], PayloadFragments), + #amqp_msg{payload = Payload} = Content, + ?assertMatch(<<"foobar">>, Payload), BasicGetEmpty = lib_amqp:get(Channel, Q, false), ?assertMatch('basic.get_empty', BasicGetEmpty), lib_amqp:teardown(Connection, Channel). @@ -157,8 +157,8 @@ basic_return_test(Connection) -> #'basic.return'{reply_text = ReplyText, exchange = X} = BasicReturn, ?assertMatch(<<"unroutable">>, ReplyText), - #content{payload_fragments_rev = Payload2} = Content, - ?assertMatch([Payload], Payload2); + #amqp_msg{payload = Payload2} = Content, + ?assertMatch(Payload, Payload2); WhatsThis -> %% TODO investigate where this comes from io:format("Spurious message ~p~n", [WhatsThis]) @@ -275,7 +275,8 @@ sleeping_consumer(Channel, Sleep, Parent) -> after Sleep -> ok end, lib_amqp:ack(Channel, DeliveryTag), - sleeping_consumer(Channel, Sleep, Parent) + sleeping_consumer(Channel, Sleep, Parent); + X -> io:format("Got some X factor ~p~n",[X]) end. do_stop(Channel, Parent) -> @@ -330,7 +331,7 @@ pc_producer_loop(Channel, X, Key, Payload, NRemaining) -> pc_consumer_loop(Channel, Payload, NReceived) -> receive {#'basic.deliver'{}, - #content{payload_fragments_rev = [DeliveredPayload]}} -> + #amqp_msg{payload = DeliveredPayload}} -> case DeliveredPayload of Payload -> pc_consumer_loop(Channel, Payload, NReceived + 1);