AMQP 1.0: Support the modified outcome

Some client libraries (QPid) will automatically send a disposition
with the 'modified' outcome in response to a client local message TTL
expiry.

To support this case and others we treat 'modified' the same as
'accepted' and simply ack the message back to the queue.

This change also contains some API extensions to the amqp10_client
to better support sending the various delivery states (outcomes).
This commit is contained in:
Karl Nilsson 2022-10-24 15:50:32 +01:00 committed by Luke Bakken
parent 62c56ddc6b
commit 802688a8ab
No known key found for this signature in database
GPG Key ID: D99DE30E43EAE440
6 changed files with 122 additions and 23 deletions

View File

@ -32,6 +32,7 @@
detach_link/1,
send_msg/2,
accept_msg/2,
settle_msg/3,
flow_link_credit/3,
flow_link_credit/4,
echo/1,
@ -335,11 +336,18 @@ send_msg(#link_ref{role = sender, session = Session,
%% @doc Accept a message on a the link referred to be the 'LinkRef'.
-spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok.
accept_msg(#link_ref{role = receiver, session = Session}, Msg) ->
accept_msg(LinkRef, Msg) ->
settle_msg(LinkRef, Msg, accepted).
%% @doc Settle a message on a the link referred to be the 'LinkRef' using
%% the chosen delivery state.
-spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(),
amqp10_client_types:delivery_state()) -> ok.
settle_msg(#link_ref{role = receiver,
session = Session}, Msg, Settlement) ->
DeliveryId = amqp10_msg:delivery_id(Msg),
amqp10_client_session:disposition(Session, receiver, DeliveryId,
DeliveryId, true, accepted).
DeliveryId, true, Settlement).
%% @doc Get a single message from a link.
%% Flows a single link credit then awaits delivery or timeout.
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.

View File

@ -20,5 +20,6 @@
-define(DBG(F, A), ok).
-endif.
-record(link_ref, {role :: sender | receiver, session :: pid(),
-record(link_ref, {role :: sender | receiver,
session :: pid(),
link_handle :: non_neg_integer()}).

View File

@ -14,14 +14,11 @@
-define(EXCHANGE_SUB_LIFETIME, "delete-on-close").
-define(DEFAULT_OUTCOME, #'v1_0.released'{}).
-define(SUPPORTED_OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
?V_1_0_SYMBOL_REJECTED,
?V_1_0_SYMBOL_RELEASED]).
-define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
?V_1_0_SYMBOL_REJECTED,
?V_1_0_SYMBOL_RELEASED,
?V_1_0_SYMBOL_MODIFIED]).
-define(SUPPORTED_OUTCOMES, ?OUTCOMES).
outcomes(Source) ->
{DefaultOutcome, Outcomes} =

View File

@ -277,6 +277,14 @@ handle_control(#'v1_0.disposition'{state = Outcome,
#'v1_0.accepted'{} ->
#'basic.ack'{delivery_tag = DeliveryTag,
multiple = false};
%% we don't care if the client modified the
%% so just treat it as accepted.
%% Some clients send modified instead of accepted
%% when e.g. a client
%% side message TTL expires.
#'v1_0.modified'{} ->
#'basic.ack'{delivery_tag = DeliveryTag,
multiple = false};
#'v1_0.rejected'{} ->
#'basic.reject'{delivery_tag = DeliveryTag,
requeue = false};

View File

@ -11,6 +11,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-compile(nowarn_export_all).
-compile(export_all).
all() ->
@ -22,6 +23,7 @@ all() ->
groups() ->
[
{tests, [], [
reliable_send_receive_with_outcomes,
roundtrip_quorum_queue_with_drain,
message_headers_conversion
]},
@ -68,6 +70,75 @@ end_per_testcase(Testcase, Config) ->
%%% TESTS
%%%
reliable_send_receive_with_outcomes(Config) ->
Outcomes = [accepted,
modified,
rejected,
released],
[begin
ct:pal("~s testing ~s", [?FUNCTION_NAME, Outcome]),
reliable_send_receive(Config, Outcome)
end || Outcome <- Outcomes],
ok.
reliable_send_receive(Config, Outcome) ->
Container = atom_to_binary(?FUNCTION_NAME, utf8),
OutcomeBin = atom_to_binary(Outcome, utf8),
QName = <<Container/binary, OutcomeBin/binary>>,
%% declare a quorum queue
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>,
longstr, <<"quorum">>}]}),
rabbit_ct_client_helpers:close_channel(Ch),
%% reliable send and consume
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Address = <<"/amq/queue/", QName/binary>>,
OpnConf = #{address => Host,
port => Port,
container_id => Container,
sasl => {plain, <<"guest">>, <<"guest">>}},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(Session,
SenderLinkName,
Address),
ok = wait_for_credit(Sender),
DTag1 = <<"dtag-1">>,
%% create an unsettled message,
%% link will be in "mixed" mode by default
Msg1 = amqp10_msg:new(DTag1, <<"body-1">>, false),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = wait_for_settlement(DTag1),
ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:close_connection(Connection),
flush("post sender close"),
{ok, Connection2} = amqp10_client:open_connection(OpnConf),
{ok, Session2} = amqp10_client:begin_session(Connection2),
ReceiverLinkName = <<"test-receiver">>,
{ok, Receiver} = amqp10_client:attach_receiver_link(Session2,
ReceiverLinkName,
Address,
unsettled),
{ok, Msg} = amqp10_client:get_msg(Receiver),
ct:pal("got ~p", [amqp10_msg:body(Msg)]),
ok = amqp10_client:settle_msg(Receiver, Msg, Outcome),
flush("post accept"),
ok = amqp10_client:detach_link(Receiver),
ok = amqp10_client:close_connection(Connection2),
ok.
roundtrip_quorum_queue_with_drain(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
@ -83,7 +154,7 @@ roundtrip_quorum_queue_with_drain(Config) ->
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
@ -141,18 +212,18 @@ message_headers_conversion(Config) ->
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, true]),
rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, true]),
OpnConf = #{address => Host,
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address),
amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address),
@ -173,7 +244,7 @@ amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) ->
wait_for_accepts(1),
{ok, Headers} = amqp091_get_msg_headers(Ch, QName),
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)),
?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)),
?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)).
@ -252,22 +323,35 @@ open_and_close_connection(OpnConf) ->
ok = amqp10_client:close_connection(Connection).
% before we can send messages we have to wait for credit from the server
wait_for_credit(Sender) ->
wait_for_credit(Sender) ->
receive
{amqp10_event, {link, Sender, credited}} ->
{amqp10_event, {link, Sender, credited}} ->
flush(?FUNCTION_NAME),
ok
after 5000 ->
flush("Credit timed out"),
exit(credited_timeout)
flush("wait_for_credit timed out"),
ct:fail(credited_timeout)
end.
wait_for_settlement(Tag) ->
receive
{amqp10_disposition, {accepted, Tag}} ->
flush(?FUNCTION_NAME),
ok
after 5000 ->
flush("wait_for_settlement timed out"),
ct:fail(credited_timeout)
end.
wait_for_accepts(0) -> ok;
wait_for_accepts(N) ->
receive
{amqp10_disposition,{accepted,_}} -> wait_for_accepts(N -1)
after 250 ->
ok
wait_for_accepts(N) ->
receive
{amqp10_disposition,{accepted,_}} ->
wait_for_accepts(N -1)
after 250 ->
ok
end.
delete_queue(Config, QName) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
_ = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),

View File

@ -190,6 +190,7 @@ module Test =
let q = "roundtrip-091-q"
let corr = "corrlation"
let sender = SenderLink(c.Session, q + "-sender" , q)
new Message("hi"B, Header = Header(),
Properties = new Properties(CorrelationId = corr))
|> sender.Send