rationalise publish/route result types
This commit is contained in:
parent
803764bc44
commit
2706342d07
|
|
@ -143,6 +143,7 @@
|
|||
host :: string() | atom(),
|
||||
port :: non_neg_integer()}).
|
||||
-type(not_found() :: {'error', 'not_found'}).
|
||||
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
|
||||
|
||||
-endif.
|
||||
|
||||
|
|
|
|||
|
|
@ -323,24 +323,24 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
|
|||
routing_key = RoutingKey,
|
||||
content = DecodedContent,
|
||||
persistent_key = PersistentKey},
|
||||
Handled =
|
||||
case rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey,
|
||||
Message) of
|
||||
{ok, DeliveredQPids} -> DeliveredQPids;
|
||||
{error, unroutable, DeliveredQPids} ->
|
||||
%% FIXME: 312 should be replaced by the ?NO_ROUTE
|
||||
%% definition, when we move to >=0-9
|
||||
ok = basic_return(Message, WriterPid, 312, <<"unroutable">>),
|
||||
DeliveredQPids;
|
||||
{error, not_delivered, DeliveredQPids} ->
|
||||
%% FIXME: 313 should be replaced by the ?NO_CONSUMERS
|
||||
%% definition, when we move to >=0-9
|
||||
ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>),
|
||||
DeliveredQPids
|
||||
end,
|
||||
{RoutingRes, DeliveredQPids} =
|
||||
rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey,
|
||||
Message),
|
||||
case RoutingRes of
|
||||
routed ->
|
||||
ok;
|
||||
unroutable ->
|
||||
%% FIXME: 312 should be replaced by the ?NO_ROUTE
|
||||
%% definition, when we move to >=0-9
|
||||
ok = basic_return(Message, WriterPid, 312, <<"unroutable">>);
|
||||
not_delivered ->
|
||||
%% FIXME: 313 should be replaced by the ?NO_CONSUMERS
|
||||
%% definition, when we move to >=0-9
|
||||
ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>)
|
||||
end,
|
||||
{noreply, case TxnKey of
|
||||
none -> State;
|
||||
_ -> add_tx_participants(Handled, State)
|
||||
_ -> add_tx_participants(DeliveredQPids, State)
|
||||
end};
|
||||
|
||||
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ publish(_Other, _Format, _Data, _State) ->
|
|||
ok.
|
||||
|
||||
publish1(RoutingKey, Format, Data, LogExch) ->
|
||||
{ok, _QueueNames} = rabbit_exchange:simple_publish(
|
||||
{ok, _RoutingRes} = rabbit_exchange:simple_publish(
|
||||
false, false, LogExch, RoutingKey, <<"text/plain">>,
|
||||
list_to_binary(io_lib:format(Format, Data))),
|
||||
ok.
|
||||
|
|
|
|||
|
|
@ -36,8 +36,7 @@
|
|||
|
||||
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
|
||||
list/1, info/1, info/2, info_all/1, info_all/2,
|
||||
publish/5, simple_publish/6, simple_publish/3,
|
||||
route/3]).
|
||||
publish/5, simple_publish/6, simple_publish/3, route/3]).
|
||||
-export([add_binding/4, delete_binding/4, list_bindings/1]).
|
||||
-export([delete/2]).
|
||||
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
|
||||
|
|
@ -57,8 +56,6 @@
|
|||
|
||||
-ifdef(use_specs).
|
||||
|
||||
-type(publish_res() :: {'ok', [pid()]} |
|
||||
{'error', 'not_found' | 'unroutable' | 'not_delivered', [pid()]}).
|
||||
-type(bind_res() :: 'ok' | {'error',
|
||||
'queue_not_found' |
|
||||
'exchange_not_found' |
|
||||
|
|
@ -76,11 +73,12 @@
|
|||
-spec(info_all/1 :: (vhost()) -> [[info()]]).
|
||||
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
|
||||
-spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) ->
|
||||
publish_res()).
|
||||
{routing_result(), [pid()]}).
|
||||
-spec(simple_publish/6 ::
|
||||
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
|
||||
publish_res()).
|
||||
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
|
||||
{ok, routing_result()} | not_found()).
|
||||
-spec(simple_publish/3 :: (bool(), bool(), message()) ->
|
||||
{ok, routing_result()} | not_found()).
|
||||
-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]).
|
||||
-spec(add_binding/4 ::
|
||||
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
|
||||
|
|
@ -202,13 +200,11 @@ publish(X, Mandatory, Immediate, Txn,
|
|||
Message = #basic_message{routing_key = RK, content = C}) ->
|
||||
case rabbit_router:deliver(route(X, RK, C),
|
||||
Mandatory, Immediate, Txn, Message) of
|
||||
R = {ok, [_|_]} ->
|
||||
R;
|
||||
{ok, []} ->
|
||||
{ok, _} = handle_unrouted(X, Txn, Message);
|
||||
{error, Error} ->
|
||||
{ok, DeliveredQPids} = handle_unrouted(X, Txn, Message),
|
||||
{error, Error, DeliveredQPids}
|
||||
{RoutingRes, []} ->
|
||||
{routed, DeliveredQPids} = handle_unrouted(X, Txn, Message),
|
||||
{RoutingRes, DeliveredQPids};
|
||||
Other ->
|
||||
Other
|
||||
end.
|
||||
|
||||
handle_unrouted(#exchange{name = XName, arguments = Args}, Txn, Message) ->
|
||||
|
|
@ -222,10 +218,10 @@ handle_unrouted(#exchange{name = XName, arguments = Args}, Txn, Message) ->
|
|||
rabbit_log:warning(
|
||||
"unroutable message exchange for ~s does not exist: ~s",
|
||||
[rabbit_misc:rs(XName), rabbit_misc:rs(UmeName)]),
|
||||
{ok, []}
|
||||
{routed, []}
|
||||
end;
|
||||
false ->
|
||||
{ok, []}
|
||||
{routed, []}
|
||||
end.
|
||||
|
||||
%% Usable by Erlang code that wants to publish messages.
|
||||
|
|
@ -246,8 +242,10 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
|
|||
simple_publish(Mandatory, Immediate,
|
||||
Message = #basic_message{exchange_name = ExchangeName}) ->
|
||||
case lookup(ExchangeName) of
|
||||
{ok, X} -> publish(X, Mandatory, Immediate, none, Message);
|
||||
{error, Error} -> {error, Error, []}
|
||||
{ok, X} -> {RoutingRes, _} = publish(X, Mandatory, Immediate, none,
|
||||
Message),
|
||||
{ok, RoutingRes};
|
||||
Other -> Other
|
||||
end.
|
||||
|
||||
sort_arguments(Arguments) ->
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@
|
|||
|
||||
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
|
||||
-spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) ->
|
||||
{'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}).
|
||||
{routing_result(), [pid()]}).
|
||||
|
||||
-endif.
|
||||
|
||||
|
|
@ -98,14 +98,15 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
|
|||
%% therefore safe to use a fire-and-forget cast here and return
|
||||
%% the QPids - the semantics is preserved. This scales much better
|
||||
%% than the non-immediate case below.
|
||||
{ok, lists:flatmap(
|
||||
fun ({Node, QPids}) ->
|
||||
gen_server2:cast(
|
||||
{?SERVER, Node},
|
||||
{deliver, QPids, Mandatory, Immediate, Txn, Message}),
|
||||
QPids
|
||||
end,
|
||||
NodeQPids)};
|
||||
{routed,
|
||||
lists:flatmap(
|
||||
fun ({Node, QPids}) ->
|
||||
gen_server2:cast(
|
||||
{?SERVER, Node},
|
||||
{deliver, QPids, Mandatory, Immediate, Txn, Message}),
|
||||
QPids
|
||||
end,
|
||||
NodeQPids)};
|
||||
deliver_per_node(NodeQPids, Mandatory, Immediate,
|
||||
Txn, Message) ->
|
||||
R = rabbit_misc:upmap(
|
||||
|
|
@ -179,6 +180,6 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
|
|||
QPids).
|
||||
|
||||
%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
|
||||
check_delivery(true, _ , {false, []}) -> {error, unroutable};
|
||||
check_delivery(_ , true, {_ , []}) -> {error, not_delivered};
|
||||
check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}.
|
||||
check_delivery(true, _ , {false, []}) -> {unroutable, []};
|
||||
check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
|
||||
check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
|
||||
|
|
|
|||
Loading…
Reference in New Issue