Merge pull request #11412 from rabbitmq/link-error

Prefer link error over session error
This commit is contained in:
Michael Klishin 2024-06-07 11:10:48 -04:00 committed by GitHub
commit b9f387b988
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 64 additions and 55 deletions

View File

@ -2305,12 +2305,8 @@ incoming_link_transfer(
"delivery_tag=~p, delivery_id=~p, reason=~p",
[DeliveryTag, DeliveryId, Reason])
end;
{error, not_found, XName} ->
{error, #'v1_0.error'{} = Err} ->
Disposition = released(DeliveryId),
Description = unicode:characters_to_binary("no " ++ rabbit_misc:rs(XName)),
Err = #'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, Description}},
Detach = detach(HandleInt, Link0, Err),
{error, [Disposition, Detach]}
end.
@ -2322,7 +2318,7 @@ lookup_target(#resource{} = XName, LinkRKey, Mc, _, _, PermCache) ->
{ok, X} ->
lookup_routing_key(X, LinkRKey, Mc, PermCache);
{error, not_found} ->
{error, not_found, XName}
{error, error_not_found(XName)}
end;
lookup_target(to, to, Mc, Vhost, User, PermCache0) ->
case mc:property(to, Mc) of
@ -2336,19 +2332,19 @@ lookup_target(to, to, Mc, Vhost, User, PermCache0) ->
check_internal_exchange(X),
lookup_routing_key(X, RKey, Mc, PermCache);
{error, not_found} ->
{error, not_found, XName}
{error, error_not_found(XName)}
end;
{error, bad_address} ->
protocol_error(
?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
"bad 'to' address string: ~ts",
[String])
{error,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"bad 'to' address string: ", String/binary>>}}}
end;
undefined ->
protocol_error(
?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
"anonymous terminus requires 'to' address to be set",
[])
{error,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}
end.
lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}},
@ -2430,7 +2426,7 @@ maybe_grant_mgmt_link_credit(Credit, _, _) ->
{ok, rabbit_amqqueue:name(), permission_cache(), topic_permission_cache()} |
{error, term()}.
ensure_source(#'v1_0.source'{dynamic = true}, _, _, _, _) ->
not_implemented("Dynamic sources not supported");
exit_not_implemented("Dynamic sources not supported");
ensure_source(#'v1_0.source'{address = Address,
durable = Durable},
Vhost, User, PermCache, TopicPermCache) ->
@ -2504,7 +2500,7 @@ ensure_source_v2(Address, _, _, _) ->
permission_cache()} |
{error, term()}.
ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) ->
not_implemented("Dynamic targets not supported");
exit_not_implemented("Dynamic targets not supported");
ensure_target(#'v1_0.target'{address = Address,
durable = Durable},
Vhost, User, PermCache) ->
@ -2549,7 +2545,7 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
end,
{ok, Exchange, RKey, QNameBin, PermCache};
{error, not_found} ->
not_found(XName)
exit_not_found(XName)
end.
ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) ->
@ -2908,16 +2904,16 @@ keyfind_unpack_described(Key, KvList) ->
end.
validate_attach(#'v1_0.attach'{target = #'v1_0.coordinator'{}}) ->
not_implemented("Transactions not supported");
exit_not_implemented("Transactions not supported");
validate_attach(#'v1_0.attach'{unsettled = {map, [_|_]}}) ->
not_implemented("Link recovery not supported");
exit_not_implemented("Link recovery not supported");
validate_attach(#'v1_0.attach'{incomplete_unsettled = true}) ->
not_implemented("Link recovery not supported");
exit_not_implemented("Link recovery not supported");
validate_attach(
#'v1_0.attach'{snd_settle_mode = SndSettleMode,
rcv_settle_mode = ?V_1_0_RECEIVER_SETTLE_MODE_SECOND})
when SndSettleMode =/= ?V_1_0_SENDER_SETTLE_MODE_SETTLED ->
not_implemented("rcv-settle-mode second not supported");
exit_not_implemented("rcv-settle-mode second not supported");
validate_attach(#'v1_0.attach'{}) ->
ok.
@ -2951,7 +2947,7 @@ validate_multi_transfer_settled(Other, First)
%% "If the message is being sent settled by the sender,
%% the value of this field [rcv-settle-mode] is ignored." [2.7.5]
validate_transfer_rcv_settle_mode(?V_1_0_RECEIVER_SETTLE_MODE_SECOND, _Settled = false) ->
not_implemented("rcv-settle-mode second not supported");
exit_not_implemented("rcv-settle-mode second not supported");
validate_transfer_rcv_settle_mode(_, _) ->
ok.
@ -3025,7 +3021,7 @@ exit_if_absent(ResourceName = #resource{kind = Kind}) ->
end,
case Mod:exists(ResourceName) of
true -> ok;
false -> not_found(ResourceName)
false -> exit_not_found(ResourceName)
end.
generate_queue_name() ->
@ -3072,10 +3068,10 @@ outcomes(#'v1_0.source'{outcomes = {array, symbol, Syms} = Outcomes}) ->
[] ->
Outcomes;
Unsupported ->
not_implemented("Outcomes not supported: ~tp", [Unsupported])
exit_not_implemented("Outcomes not supported: ~tp", [Unsupported])
end;
outcomes(#'v1_0.source'{outcomes = Unsupported}) ->
not_implemented("Outcomes not supported: ~tp", [Unsupported]);
exit_not_implemented("Outcomes not supported: ~tp", [Unsupported]);
outcomes(_) ->
{array, symbol, ?OUTCOMES}.
@ -3330,30 +3326,37 @@ check_paired({map, Properties}) ->
true ->
ok;
false ->
property_paired_not_set()
exit_property_paired_not_set()
end;
check_paired(_) ->
property_paired_not_set().
exit_property_paired_not_set().
-spec property_paired_not_set() -> no_return().
property_paired_not_set() ->
-spec exit_property_paired_not_set() -> no_return().
exit_property_paired_not_set() ->
protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD,
"Link property 'paired' is not set to boolean value 'true'", []).
-spec not_implemented(io:format()) -> no_return().
not_implemented(Format) ->
not_implemented(Format, []).
-spec exit_not_implemented(io:format()) -> no_return().
exit_not_implemented(Format) ->
exit_not_implemented(Format, []).
-spec not_implemented(io:format(), [term()]) -> no_return().
not_implemented(Format, Args) ->
-spec exit_not_implemented(io:format(), [term()]) -> no_return().
exit_not_implemented(Format, Args) ->
protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, Format, Args).
-spec not_found(rabbit_types:r(exchange | queue)) -> no_return().
not_found(Resource) ->
-spec exit_not_found(rabbit_types:r(exchange | queue)) -> no_return().
exit_not_found(Resource) ->
protocol_error(?V_1_0_AMQP_ERROR_NOT_FOUND,
"no ~ts",
[rabbit_misc:rs(Resource)]).
-spec error_not_found(rabbit_types:r(exchange | queue)) -> #'v1_0.error'{}.
error_not_found(Resource) ->
Description = unicode:characters_to_binary("no " ++ rabbit_misc:rs(Resource)),
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, Description}}.
address_v1_permitted() ->
rabbit_deprecated_features:is_permitted(amqp_address_v1).

View File

@ -391,16 +391,19 @@ target_per_message_unset_to_address(Config) ->
ok = wait_for_credit(Sender),
%% Send message with 'to' unset.
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<0>>, <<0>>)),
receive
{amqp10_event,
{session, Session,
{ended,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
DTag = <<1>>,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, <<0>>)),
ok = wait_for_settled(released, DTag),
receive {amqp10_event,
{link, Sender,
{detached,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}}}} -> ok
after 5000 -> ct:fail("server did not close our outgoing link")
end,
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).
bad_v2_addresses() ->
@ -436,17 +439,20 @@ target_per_message_bad_to_address0(Address, Config) ->
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null),
ok = wait_for_credit(Sender),
Msg = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(<<0>>, <<0>>)),
DTag = <<255>>,
Msg = amqp10_msg:set_properties(#{to => Address}, amqp10_msg:new(DTag, <<0>>)),
ok = amqp10_client:send_msg(Sender, Msg),
receive
{amqp10_event,
{session, Session,
{ended,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"bad 'to' address", _Rest/binary>>}}}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE, Address})
ok = wait_for_settled(released, DTag),
receive {amqp10_event,
{link, Sender,
{detached,
#'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
description = {utf8, <<"bad 'to' address", _Rest/binary>>}}}}} -> ok
after 5000 -> ct:fail("server did not close our outgoing link")
end,
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).
target_per_message_exchange_absent(Config) ->