make dispositions of unsettled messages

async
This commit is contained in:
kjnilsson 2017-02-07 11:22:34 +00:00
parent 678ef4362b
commit 1b43387aaf
4 changed files with 35 additions and 17 deletions

View File

@ -213,7 +213,6 @@ handle_input(expecting_frame_body, Data,
{PerfDesc, Payload} = rabbit_amqp1_0_binary_parser:parse(FrameBody),
Perf = rabbit_amqp1_0_framing:decode(PerfDesc),
error_logger:info_msg("PERF ~p~n", [Perf]),
% Frame = rabbit_amqp1_0_framing:decode_bin(FrameBody),
State2 = route_frame(Channel, FrameType, {Perf, Payload}, State1),
handle_input(expecting_frame_header, Rest, State2);
_ ->
@ -226,8 +225,8 @@ handle_input(StateName, Data, State) ->
route_frame(Channel, FrameType, {Performative, Payload} = Frame, State0) ->
{DestinationPid, State} = find_destination(Channel, FrameType, Performative,
State0),
error_logger:info_msg("ROUTING FRAME ~p -> (~p, ~p)",
[Frame, Channel, DestinationPid]),
% error_logger:info_msg("ROUTING FRAME ~p -> (~p, ~p)",
% [Frame, Channel, DestinationPid]),
case Payload of
<<>> -> ok = gen_fsm:send_event(DestinationPid, Performative);
_ -> ok = gen_fsm:send_event(DestinationPid, Frame)

View File

@ -54,7 +54,7 @@ flow_credit(#link_ref{role = receiver, session = Session,
% else it returns the delivery state from the disposition
% TODO: timeouts
-spec send(link_ref(), amqp10_msg:amqp10_msg()) ->
ok | insufficient_credit | amqp10_client_types:delivery_state().
{ok, non_neg_integer()} | {error, insufficient_credit | link_not_found}.
send(#link_ref{role = sender, session = Session,
link_handle = Handle}, Msg0) ->
Msg = amqp10_msg:set_handle(Handle, Msg0),
@ -68,7 +68,7 @@ accept(#link_ref{role = receiver, session = Session}, Msg) ->
-spec sender(pid(), binary(), binary()) -> {ok, link_ref()}.
sender(Session, Name, Target) ->
sender(Session, Name, Target, settled).
sender(Session, Name, Target, mixed). % mixed should work with any type of msg
-spec sender(pid(), binary(), binary(),
amqp10_client_session:snd_settle_mode()) -> {ok, link_ref()}.

View File

@ -143,7 +143,7 @@ attach(Session, Args) ->
gen_fsm:sync_send_event(Session, {attach, Args}).
-spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) ->
ok | insufficient_credit | amqp10_client_types:delivery_state().
{ok, non_neg_integer()} | {error, insufficient_credit | link_not_found}.
transfer(Session, Amqp10Msg, Timeout) ->
[Transfer | Records] = amqp10_msg:to_amqp_records(Amqp10Msg),
gen_fsm:sync_send_event(Session, {transfer, Transfer, Records}, Timeout).
@ -346,7 +346,7 @@ mapped(#'v1_0.disposition'{role = true, settled = true, first = {uint, First},
case Acc of
#{Id := {_Handle, Receiver}} ->
S = translate_delivery_state(DeliveryState),
gen_fsm:reply(Receiver, S),
ok = notify_disposition(Receiver, {Id, S}),
maps:remove(Id, Acc);
_ -> Acc
end
@ -375,7 +375,8 @@ mapped({transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
ok = send_transfer(Transfer, Parts, State),
% delay reply to caller until disposition frame is received
State1 = State#state{unsettled = #{NDI => {OutHandle, From}}},
{next_state, mapped, book_transfer_send(Link, State1)};
% {next_state, mapped, book_transfer_send(Link, State1)};
{reply, {ok, NDI}, mapped, book_transfer_send(Link, State1)};
_ ->
{reply, {error, link_not_found}, mapped, State}
end;
@ -385,13 +386,13 @@ mapped({transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0,
case Links of
#{OutHandle := #link{link_credit = LC}} when LC =< 0 ->
{reply, insufficient_credit, mapped, State};
{reply, {error, insufficient_credit}, mapped, State};
#{OutHandle := Link} ->
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(NDI)},
ok = send_transfer(Transfer, Parts, State),
% TODO look into if erlang will correctly wrap integers during
% binary conversion.
{reply, ok, mapped, book_transfer_send(Link, State)};
{reply, {ok, NDI}, mapped, book_transfer_send(Link, State)};
_ ->
{reply, {error, link_not_found}, mapped, State}
end;
@ -470,6 +471,7 @@ encode_frame(Record, #state{channel = Channel}) ->
rabbit_amqp1_0_binary_generator:build_frame(Channel, Encoded).
send(Record, #state{socket = Socket} = State) ->
error_logger:info_msg("SESSION SEND ~p~n", [Record]),
Frame = encode_frame(Record, State),
gen_tcp:send(Socket, Frame).
@ -491,6 +493,7 @@ send_transfer(Transfer0, Parts0, #state{socket = Socket, channel = Channel,
MaxPayloadSize = OutMaxFrameSize - TSize - ?FRAME_HEADER_SIZE,
Frames = build_frames(Channel, Transfer0, PartsBin, MaxPayloadSize, []),
error_logger:info_msg("SESSION SEND ~p~n", [Transfer0]),
ok = gen_tcp:send(Socket, Frames).
build_frames(Channel, Trf, Bin, MaxPayloadSize, Acc)
@ -638,6 +641,10 @@ notify_session_begin(#state{owner = Owner, notify = true}) ->
ok;
notify_session_begin(_State) -> ok.
notify_disposition({Pid, _}, N) ->
Pid ! {disposition, N},
ok.
book_transfer_send(#link{output_handle = Handle} = Link,
#state{next_outgoing_id = NOI,
next_delivery_id = NDI,
@ -687,6 +694,9 @@ decode_as_msg(Transfer, Payload) ->
Records = rabbit_amqp1_0_framing:decode_bin(Payload),
amqp10_msg:from_amqp_records([Transfer | Records]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

View File

@ -159,7 +159,7 @@ basic_roundtrip(Config) ->
{ok, Sender} = amqp10_client_link:sender(Session, <<"banana-sender">>,
<<"test">>),
Msg = amqp10_msg:new(<<"my-tag">>, <<"banana">>, true),
ok = amqp10_client_link:send(Sender, Msg),
{ok, _} = amqp10_client_link:send(Sender, Msg),
{ok, Receiver} = amqp10_client_link:receiver(Session, <<"banana-receiver">>,
<<"test">>),
{amqp_msg, OutMsg} = amqp10_client_link:get_one(Receiver),
@ -178,7 +178,7 @@ split_transfer(Config) ->
{ok, Sender} = amqp10_client_link:sender(Session, <<"data-sender">>,
<<"test">>),
Msg = amqp10_msg:new(<<"my-tag">>, Data, true),
ok = amqp10_client_link:send(Sender, Msg),
{ok, _} = amqp10_client_link:send(Sender, Msg),
{ok, Receiver} = amqp10_client_link:receiver(Session, <<"data-receiver">>,
<<"test">>),
{amqp_msg, OutMsg} = amqp10_client_link:get_one(Receiver),
@ -196,7 +196,8 @@ transfer_unsettled(Config) ->
{ok, Sender} = amqp10_client_link:sender(Session, <<"data-sender">>,
<<"test">>, unsettled),
Msg = amqp10_msg:new(<<"my-tag">>, Data, false),
accepted = amqp10_client_link:send(Sender, Msg),
{ok, DeliveryId} = amqp10_client_link:send(Sender, Msg),
ok = await_disposition(DeliveryId),
{ok, Receiver} = amqp10_client_link:receiver(Session, <<"data-receiver">>,
<<"test">>, unsettled),
{amqp_msg, OutMsg} = amqp10_client_link:get_one(Receiver),
@ -214,10 +215,12 @@ subscribe(Config) ->
{ok, Session} = amqp10_client_session:'begin'(Connection),
{ok, Sender} = amqp10_client_link:sender(Session, <<"sub-sender">>,
QueueName),
Msg1 = amqp10_msg:new(<<"my-tag">>, <<"banana">>, true),
Msg2 = amqp10_msg:new(<<"my-tag2">>, <<"banana">>, true),
ok = amqp10_client_link:send(Sender, Msg1),
ok = amqp10_client_link:send(Sender, Msg2),
Msg1 = amqp10_msg:new(<<"my-taggy">>, <<"banana">>, false),
Msg2 = amqp10_msg:new(<<"my-taggy2">>, <<"banana">>, false),
{ok, DeliveryId1} = amqp10_client_link:send(Sender, Msg1),
ok = await_disposition(DeliveryId1),
{ok, DeliveryId2} = amqp10_client_link:send(Sender, Msg2),
ok = await_disposition(DeliveryId2),
{ok, Receiver} = amqp10_client_link:receiver(Session, <<"sub-receiver">>,
QueueName, unsettled),
ok = amqp10_client_link:flow_credit(Receiver, 2),
@ -285,3 +288,9 @@ receive_one(Receiver) ->
after 2000 ->
timeout
end.
await_disposition(DeliveryId) ->
receive
{disposition, {DeliveryId, accepted}} -> ok
after 3000 -> exit(dispostion_timeout)
end.