Settle unroutable message with released state

Currently it is not possible for an AMQP 1.0 to make find
out a message is unroutable as it is settled with the
accepted state. This is because the current implementation
relies only on the publish confirms AMQP 091 extension.

This commit changes this by settling the message with the
released state. It uses the mandatory flag mechanism from
AMQP 091 and "internally" extends it to provide not only
the message in the callback, but the publishing sequence
as well. This applies only for AMQP 1.0, not for other
cases. Publish confirms and the mandatory flag are used
in conjonction in this case.

References #7823
This commit is contained in:
Arnaud Cogoluègnes 2023-04-27 10:56:32 +02:00
parent 5a17c86df3
commit d8f77c5882
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
7 changed files with 135 additions and 20 deletions

View File

@ -886,6 +886,9 @@ flush_writer(#state{driver = direct}) ->
ok.
amqp_msg(none) ->
none;
amqp_msg({DTag, Content}) ->
{Props, Payload} = rabbit_basic_common:from_content(Content),
{DTag, #amqp_msg{props = Props, payload = Payload}};
amqp_msg(Content) ->
{Props, Payload} = rabbit_basic_common:from_content(Content),
#amqp_msg{props = Props, payload = Payload}.

View File

@ -111,7 +111,10 @@
consumer_timeout,
authz_context,
%% defines how ofter gc will be executed
writer_gc_threshold
writer_gc_threshold,
%% true with AMQP 1.0 to include the publishing sequence
%% in the return callback, false otherwise
extended_return_callback
}).
-record(pending_ack, {
@ -518,6 +521,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
MaxMessageSize = get_max_message_size(),
ConsumerTimeout = get_consumer_timeout(),
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
UseExtendedReturnCallback = use_extended_return_callback(AmqpParams),
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
@ -536,7 +540,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
max_message_size = MaxMessageSize,
consumer_timeout = ConsumerTimeout,
authz_context = OptionalVariables,
writer_gc_threshold = GCThreshold
writer_gc_threshold = GCThreshold,
extended_return_callback = UseExtendedReturnCallback
},
limiter = Limiter,
tx = none,
@ -1076,6 +1081,15 @@ extract_variable_map_from_amqp_params([Value]) ->
extract_variable_map_from_amqp_params(_) ->
#{}.
%% Use tuple representation of amqp_params to avoid a dependency on amqp_client.
%% Used for AMQP 1.0
use_extended_return_callback({amqp_params_direct,_,_,_,_,
{amqp_adapter_info,_,_,_,_,_,{'AMQP',"1.0"},_},
_}) ->
true;
use_extended_return_callback(_) ->
false.
check_msg_size(Content, MaxMessageSize, GCThreshold) ->
Size = rabbit_basic:maybe_gc_large_msg(Content, GCThreshold),
case Size of
@ -1917,9 +1931,8 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
ok
end.
basic_return(#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content},
basic_return(Content, #basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes]},
State = #ch{cfg = #conf{protocol = Protocol,
writer_pid = WriterPid}},
Reason) ->
@ -2154,7 +2167,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
mandatory = Mandatory,
confirm = Confirm,
msg_seq_no = MsgSeqNo},
RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
RoutedToQueueNames = [QName]},
State0 = #ch{cfg = #conf{extended_return_callback = ExtendedReturnCallback},
queue_states = QueueStates0}) -> %% optimisation when there is one queue
Qs0 = rabbit_amqqueue:lookup_many(RoutedToQueueNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
@ -2162,7 +2177,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
ok = process_routing_mandatory(ExtendedReturnCallback, Mandatory, Qs, MsgSeqNo, Message, State0),
QueueNames = rabbit_amqqueue:queue_names(Qs),
State1 = process_routing_confirm(Confirm, QueueNames, MsgSeqNo, XName, State0),
%% Actions must be processed after registering confirms as actions may
@ -2191,7 +2206,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
mandatory = Mandatory,
confirm = Confirm,
msg_seq_no = MsgSeqNo},
RoutedToQueueNames}, State0 = #ch{queue_states = QueueStates0}) ->
RoutedToQueueNames},
State0 = #ch{cfg = #conf{extended_return_callback = ExtendedReturnCallback},
queue_states = QueueStates0}) ->
Qs0 = rabbit_amqqueue:lookup_many(RoutedToQueueNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
@ -2199,7 +2216,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
ok = process_routing_mandatory(ExtendedReturnCallback, Mandatory, Qs, MsgSeqNo, Message, State0),
QueueNames = rabbit_amqqueue:queue_names(Qs),
State1 = process_routing_confirm(Confirm, QueueNames,
MsgSeqNo, XName, State0),
@ -2222,19 +2239,32 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
[rabbit_misc:rs(Resource)])
end.
process_routing_mandatory(_Mandatory = true,
process_routing_mandatory(_ExtendedReturnCallback = false,
_Mandatory = true,
_RoutedToQs = [],
Msg, State) ->
_MsgSeqNo,
#basic_message{content = Content} = Msg, State) ->
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
ok = basic_return(Msg, State, no_route),
ok = basic_return(Content, Msg, State, no_route),
ok;
process_routing_mandatory(_Mandatory = false,
process_routing_mandatory(_ExtendedReturnCallback = true,
_Mandatory = true,
_RoutedToQs = [],
MsgSeqNo,
#basic_message{content = Content} = Msg, State) ->
rabbit_global_counters:messages_unroutable_returned(amqp091, 1),
%% providing the publishing sequence for AMQP 1.0
ok = basic_return({MsgSeqNo, Content}, Msg, State, no_route),
ok;
process_routing_mandatory(_ExtendedReturnCallback,
_Mandatory = false,
_RoutedToQs = [],
_MsgSeqNo,
#basic_message{exchange_name = ExchangeName}, State) ->
rabbit_global_counters:messages_unroutable_dropped(amqp091, 1),
?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
ok;
process_routing_mandatory(_, _, _, _) ->
process_routing_mandatory(_, _, _, _, _, _) ->
ok.
process_routing_confirm(false, _, _, _, State) ->

View File

@ -105,7 +105,10 @@
-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
-spec send_command
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) ->
(pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content() |
{integer(), rabbit_types:content()} %% publishing sequence for AMQP 1.0 return callback
) ->
'ok'.
-spec send_command_sync(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
-spec send_command_sync

View File

@ -17,7 +17,7 @@
%% Just make these constant for the time being.
-define(INCOMING_CREDIT, 65536).
-record(incoming_link, {name, exchange, routing_key,
-record(incoming_link, {name, exchange, routing_key, mandatory,
delivery_id = undefined,
delivery_count = 0,
send_settle_mode = undefined,
@ -53,6 +53,7 @@ attach(#'v1_0.attach'{name = Name,
SndSettleMode == ?V_1_0_SENDER_SETTLE_MODE_MIXED ->
amqp_channel:register_confirm_handler(BCh, self()),
rabbit_amqp1_0_channel:call(BCh, #'confirm.select'{}),
amqp_channel:register_return_handler(BCh, self()),
true
end,
Flow = #'v1_0.flow'{ handle = Handle,
@ -69,7 +70,8 @@ attach(#'v1_0.attach'{name = Name,
initial_delivery_count = undefined, % must be, I am the receiver
role = ?RECV_ROLE}, %% server is receiver
IncomingLink1 =
IncomingLink#incoming_link{recv_settle_mode = RcvSettleMode},
IncomingLink#incoming_link{recv_settle_mode = RcvSettleMode,
mandatory = Confirm},
{ok, [Attach, Flow], IncomingLink1, Confirm};
{error, Reason} ->
%% TODO proper link establishment protocol here?
@ -142,7 +144,8 @@ transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
end,
rabbit_amqp1_0_channel:cast_flow(
BCh, #'basic.publish'{exchange = X,
routing_key = RKey}, Msg),
routing_key = RKey,
mandatory = true}, Msg),
{SendFlow, CreditUsed1} = case CreditUsed - 1 of
C when C =< 0 ->
{true, ?INCOMING_CREDIT div 2};

View File

@ -14,7 +14,7 @@
incr_incoming_id/1, next_delivery_id/1, transfers_left/1,
record_transfers/2, bump_outgoing_window/1,
record_outgoing/4, settle/3, flow_fields/2, channel/1,
flow/2, ack/2, validate_attach/1]).
flow/2, ack/2, return/2, validate_attach/1]).
-import(rabbit_amqp1_0_util, [protocol_error/3,
serial_add/2, serial_diff/2, serial_compare/2]).
@ -396,3 +396,25 @@ acknowledgement(DeliveryIds, Disposition) ->
last = {uint, lists:last(DeliveryIds)},
settled = true,
state = #'v1_0.accepted'{} }.
return(DTag, Session = #session{incoming_unsettled_map = Unsettled}) ->
{DeliveryId,
Unsettled1} = case gb_trees:lookup(DTag, Unsettled) of
{value, #incoming_delivery{ delivery_id = Id }} ->
{Id, gb_trees:delete(DTag, Unsettled)};
none ->
{undefined, Unsettled}
end,
Disposition = case DeliveryId of
undefined -> undefined;
_ -> release(DeliveryId,
#'v1_0.disposition'{role = ?RECV_ROLE})
end,
{Disposition,
Session#session{incoming_unsettled_map = Unsettled1}}.
release(DeliveryId, Disposition) ->
Disposition#'v1_0.disposition'{ first = {uint, DeliveryId},
last = {uint, DeliveryId},
settled = true,
state = #'v1_0.released'{} }.

View File

@ -130,6 +130,24 @@ handle_info(#'basic.ack'{} = Ack, State = #state{writer_pid = WriterPid,
F <- rabbit_amqp1_0_session:flow_fields(Reply, Session)],
{noreply, state(Session1, State)};
handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
session = Session}) ->
{Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
case Reply of
undefined ->
ok;
_ ->
rabbit_amqp1_0_writer:send_command(
WriterPid,
rabbit_amqp1_0_session:flow_fields(Reply, Session)
)
end,
{noreply, state(Session1, State)};
handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
{noreply, state(Session, State)};
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
{noreply, State};

View File

@ -24,6 +24,7 @@ groups() ->
[
{tests, [], [
reliable_send_receive_with_outcomes,
publishing_to_non_existing_queue_should_settle_with_released,
roundtrip_classic_queue_with_drain,
roundtrip_quorum_queue_with_drain,
roundtrip_stream_queue_with_drain,
@ -151,6 +152,38 @@ reliable_send_receive(Config, Outcome) ->
ok.
publishing_to_non_existing_queue_should_settle_with_released(Config) ->
Container = atom_to_binary(?FUNCTION_NAME, utf8),
Suffix = <<"foo">>,
%% does not exist
QName = <<Container/binary, Suffix/binary>>,
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Address = <<"/amq/queue/", QName/binary>>,
OpnConf = #{address => Host,
port => Port,
container_id => Container,
sasl => {plain, <<"guest">>, <<"guest">>}},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(Session,
SenderLinkName,
Address),
ok = wait_for_credit(Sender),
DTag1 = <<"dtag-1">>,
%% create an unsettled message,
%% link will be in "mixed" mode by default
Msg1 = amqp10_msg:new(DTag1, <<"body-1">>, false),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = wait_for_settlement(DTag1, released),
ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:close_connection(Connection),
flush("post sender close"),
ok.
roundtrip_classic_queue_with_drain(Config) ->
QName = atom_to_binary(?FUNCTION_NAME, utf8),
roundtrip_queue_with_drain(Config, <<"classic">>, QName).
@ -371,8 +404,11 @@ wait_for_credit(Sender) ->
end.
wait_for_settlement(Tag) ->
wait_for_settlement(Tag, accepted).
wait_for_settlement(Tag, State) ->
receive
{amqp10_disposition, {accepted, Tag}} ->
{amqp10_disposition, {State, Tag}} ->
flush(?FUNCTION_NAME),
ok
after 5000 ->