Do not expose delivery ids in api

The user should only have to keep track of the
user-provided deliery tag.
This commit is contained in:
kjnilsson 2017-03-24 16:19:43 +00:00
parent bb7682ce4b
commit a8ec136090
7 changed files with 53 additions and 52 deletions

View File

@ -7,10 +7,8 @@ DEPS = amqp10_common
TEST_DEPS = rabbit rabbitmq_amqp1_0 rabbitmq_ct_helpers
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk elvis_mk
dep_elvis_mk = git https://github.com/inaka/elvis.mk.git master
# FIXME: Use erlang.mk patched for RabbitMQ, while waiting for PRs to be
# reviewed and merged.
@ -20,6 +18,9 @@ ERLANG_MK_COMMIT = rabbitmq-tmp
include rabbitmq-components.mk
include erlang.mk
# dialyze the tests
DIALYZER_OPTS += --src -r test
# --------------------------------------------------------------------
# ActiveMQ for the testsuite.
# --------------------------------------------------------------------

View File

@ -5,21 +5,11 @@
-define(FRAME_HEADER_SIZE, 8).
-define(debug, true).
-ifdef(debug).
-define(DBG(F, A), error_logger:info_msg(F, A)).
-else.
-define(DBG(F, A), ok).
-endif.

View File

@ -31,8 +31,12 @@
link_handle :: non_neg_integer(), link_name :: binary()}).
-opaque link_ref() :: #link_ref{}.
-export_type([link_ref/0
-type result(Succ, Err) :: {ok, Succ} | {error, Err}.
-export_type([link_ref/0,
result/2
]).
-spec open_connection(
inet:socket_address() | inet:hostname(),
inet:port_number()) -> supervisor:startchild_ret().
@ -92,6 +96,7 @@ attach_sender_link(Session, Name, Target, SettleMode) ->
snd_settle_mode => SettleMode,
rcv_settle_mode => first},
% TODO: work out what kind of errors may happen during attach
{ok, Attach} = amqp10_client_session:attach(Session, AttachArgs),
{ok, make_link_ref(sender, Session, Name, Attach)}.
@ -124,7 +129,7 @@ flow_link_credit(#link_ref{role = receiver, session = Session,
% else it returns the delivery state from the disposition
% TODO: timeouts
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
{ok, non_neg_integer()} | {error, insufficient_credit | link_not_found}.
ok | {error, insufficient_credit | link_not_found}.
send_msg(#link_ref{role = sender, session = Session,
link_handle = Handle}, Msg0) ->
Msg = amqp10_msg:set_handle(Handle, Msg0),
@ -136,8 +141,7 @@ accept_msg(#link_ref{role = receiver, session = Session}, Msg) ->
amqp10_client_session:disposition(Session, receiver, DeliveryId,
DeliveryId, true, accepted).
-spec get_msg(link_ref()) ->
{ok, amqp10_msg:amqp10_msg()} | {error, timeout}.
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.
get_msg(LinkRef) ->
get_msg(LinkRef, ?DEFAULT_TIMEOUT).

View File

@ -106,18 +106,15 @@ set_other_procs(Pid, OtherProcs) ->
gen_fsm:send_all_state_event(Pid, {set_other_procs, OtherProcs}).
-spec socket_ready(pid(), gen_tcp:socket()) -> ok.
socket_ready(Pid, Socket) ->
gen_fsm:send_event(Pid, {socket_ready, Socket}).
-spec protocol_header_received(pid(), 0 | 3, non_neg_integer(), non_neg_integer(),
non_neg_integer()) -> ok.
-spec protocol_header_received(pid(), 0 | 3, non_neg_integer(),
non_neg_integer(), non_neg_integer()) -> ok.
protocol_header_received(Pid, Protocol, Maj, Min, Rev) ->
gen_fsm:send_event(Pid, {protocol_header_received, Protocol, Maj, Min, Rev}).
-spec begin_session(pid()) -> supervisor:startchild_ret().
begin_session(Pid) ->
gen_fsm:sync_send_all_state_event(Pid, begin_session).

View File

@ -103,7 +103,7 @@
connection_config = #{} :: amqp10_client_connection:connection_config(),
% the unsettled map needs to go in the session state as a disposition
% can reference transfers for many different links
unsettled = #{} :: #{transfer_id() => {link_handle(), any()}}, %TODO: refine as FsmRef
unsettled = #{} :: #{transfer_id() => {amqp10_client:delivery_tag(), any()}}, %TODO: refine as FsmRef
incoming_unsettled = #{} :: #{transfer_id() => link_handle()},
notify :: pid()
}).
@ -142,7 +142,7 @@ attach(Session, Args) ->
gen_fsm:sync_send_event(Session, {attach, Args}).
-spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) ->
{ok, non_neg_integer()} | {error, insufficient_credit | link_not_found}.
ok | {error, insufficient_credit | link_not_found}.
transfer(Session, Amqp10Msg, Timeout) ->
[Transfer | Records] = amqp10_msg:to_amqp_records(Amqp10Msg),
gen_fsm:sync_send_event(Session, {transfer, Transfer, Records}, Timeout).
@ -345,9 +345,10 @@ mapped(#'v1_0.disposition'{role = true, settled = true, first = {uint, First},
Unsettled =
lists:foldl(fun(Id, Acc) ->
case Acc of
#{Id := {_Handle, Receiver}} ->
#{Id := {DeliveryTag, Receiver}} ->
S = translate_delivery_state(DeliveryState),
ok = notify_disposition(Receiver, {S, Id}),
ok = notify_disposition(Receiver,
{S, DeliveryTag}),
maps:remove(Id, Acc);
_ -> Acc
end
@ -365,6 +366,7 @@ mapped(Frame, State) ->
%%
%% Transfer. See spec section: 2.6.12
mapped({transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
delivery_tag = {binary, DeliveryTag},
settled = false} = Transfer0, Parts}, From,
#state{next_delivery_id = NDI, links = Links} = State) ->
@ -375,9 +377,9 @@ mapped({transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(NDI)},
ok = send_transfer(Transfer, Parts, State),
% delay reply to caller until disposition frame is received
State1 = State#state{unsettled = #{NDI => {OutHandle, From}}},
State1 = State#state{unsettled = #{NDI => {DeliveryTag, From}}},
% {next_state, mapped, book_transfer_send(Link, State1)};
{reply, {ok, NDI}, mapped, book_transfer_send(Link, State1)};
{reply, ok, mapped, book_transfer_send(Link, State1)};
_ ->
{reply, {error, link_not_found}, mapped, State}
end;
@ -393,7 +395,7 @@ mapped({transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0,
ok = send_transfer(Transfer, Parts, State),
% TODO look into if erlang will correctly wrap integers during
% binary conversion.
{reply, {ok, NDI}, mapped, book_transfer_send(Link, State)};
{reply, ok, mapped, book_transfer_send(Link, State)};
_ ->
{reply, {error, link_not_found}, mapped, State}
end;

View File

@ -31,6 +31,7 @@
-type maybe(T) :: T | undefined.
-type delivery_tag() :: binary().
-type content_type() :: term(). % TODO: refine
-type content_encoding() :: term(). % TODO: refine
@ -82,7 +83,8 @@
-export_type([amqp10_msg/0,
amqp10_header/0,
amqp10_properties/0,
amqp10_body/0
amqp10_body/0,
delivery_tag/0
]).
-define(record_to_tuplelist(Rec, Ref),
@ -101,7 +103,7 @@ from_amqp_records([#'v1_0.transfer'{} = Transfer | Records]) ->
to_amqp_records(#amqp10_msg{transfer = T, body = B}) ->
lists:flatten([T, B]).
-spec delivery_tag(amqp10_msg()) -> binary().
-spec delivery_tag(amqp10_msg()) -> delivery_tag().
delivery_tag(#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = Tag}}) ->
unpack(Tag).
@ -211,7 +213,7 @@ body(#amqp10_msg{body = [#'v1_0.data'{} | _] = Data}) ->
body(#amqp10_msg{body = Body}) -> Body.
-spec new(binary(), amqp10_body() | binary(), boolean()) -> amqp10_msg().
-spec new(delivery_tag(), amqp10_body() | binary(), boolean()) -> amqp10_msg().
new(DeliveryTag, Body, Settled) when is_binary(Body) ->
#amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag},
settled = Settled},
@ -221,7 +223,7 @@ new(DeliveryTag, Body, Settled) ->
settled = Settled},
body = Body}.
-spec new(binary(), amqp10_body() | binary()) -> amqp10_msg().
-spec new(delivery_tag(), amqp10_body() | binary()) -> amqp10_msg().
new(DeliveryTag, Body) ->
new(DeliveryTag, Body, false).

View File

@ -167,9 +167,10 @@ basic_roundtrip(Config) ->
<<"banana-sender">>,
<<"test">>),
Msg = amqp10_msg:new(<<"my-tag">>, <<"banana">>, true),
{ok, _} = amqp10_client:send_msg(Sender, Msg),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"banana-receiver">>,
<<"test">>),
ok = amqp10_client:send_msg(Sender, Msg),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session,
<<"banana-receiver">>,
<<"test">>),
{ok, OutMsg} = amqp10_client:get_msg(Receiver),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection),
@ -185,11 +186,12 @@ split_transfer(Config) ->
{ok, Session} = amqp10_client:begin_session(Connection),
Data = list_to_binary(string:chars(64, 1000)),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"data-sender">>,
<<"test">>),
<<"test">>),
Msg = amqp10_msg:new(<<"my-tag">>, Data, true),
{ok, _} = amqp10_client:send_msg(Sender, Msg),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"data-receiver">>,
<<"test">>),
ok = amqp10_client:send_msg(Sender, Msg),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session,
<<"data-receiver">>,
<<"test">>),
{ok, OutMsg} = amqp10_client:get_msg(Receiver),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection),
@ -204,9 +206,10 @@ transfer_unsettled(Config) ->
Data = list_to_binary(string:chars(64, 1000)),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"data-sender">>,
<<"test">>, unsettled),
Msg = amqp10_msg:new(<<"my-tag">>, Data, false),
{ok, DeliveryId} = amqp10_client:send_msg(Sender, Msg),
ok = await_disposition(DeliveryId),
DeliveryTag = <<"my-tag">>,
Msg = amqp10_msg:new(DeliveryTag, Data, false),
ok = amqp10_client:send_msg(Sender, Msg),
ok = await_disposition(DeliveryTag),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"data-receiver">>,
<<"test">>, unsettled),
{ok, OutMsg} = amqp10_client:get_msg(Receiver),
@ -224,12 +227,14 @@ subscribe(Config) ->
{ok, Session} = amqp10_client:begin_session(Connection),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sub-sender">>,
QueueName),
Msg1 = amqp10_msg:new(<<"my-taggy">>, <<"banana">>, false),
Msg2 = amqp10_msg:new(<<"my-taggy2">>, <<"banana">>, false),
{ok, DeliveryId1} = amqp10_client:send_msg(Sender, Msg1),
ok = await_disposition(DeliveryId1),
{ok, DeliveryId2} = amqp10_client:send_msg(Sender, Msg2),
ok = await_disposition(DeliveryId2),
Tag1 = <<"t1">>,
Tag2 = <<"t2">>,
Msg1 = amqp10_msg:new(Tag1, <<"banana">>, false),
Msg2 = amqp10_msg:new(Tag2, <<"banana">>, false),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = await_disposition(Tag1),
ok = amqp10_client:send_msg(Sender, Msg2),
ok = await_disposition(Tag2),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"sub-receiver">>,
QueueName, unsettled),
ok = amqp10_client:flow_link_credit(Receiver, 2),
@ -337,8 +342,8 @@ receive_one(Receiver) ->
timeout
end.
await_disposition(DeliveryId) ->
await_disposition(DeliveryTag) ->
receive
{amqp10_disposition, {accepted, DeliveryId}} -> ok
{amqp10_disposition, {accepted, DeliveryTag}} -> ok
after 3000 -> exit(dispostion_timeout)
end.