Make MC conversion function return ok or error

This commit is contained in:
Arnaud Cogoluègnes 2023-09-07 16:56:29 +02:00
parent 1a63e1af33
commit e22fcd70fe
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
12 changed files with 41 additions and 34 deletions

View File

@ -433,12 +433,13 @@ protocol_state(Content0, Anns) ->
%% changed
protocol_state(prepare(read, Content0), Anns).
-spec message(rabbit_types:exchange_name(), rabbit_types:routing_key(), #content{}) -> mc:state() | {error, Reason :: any()}.
-spec message(rabbit_types:exchange_name(), rabbit_types:routing_key(), #content{}) ->
{ok, mc:state()} | {error, Reason :: any()}.
message(ExchangeName, RoutingKey, Content) ->
message(ExchangeName, RoutingKey, Content, #{}).
-spec message(rabbit_types:exchange_name(), rabbit_types:routing_key(), #content{}, map()) ->
mc:state().
{ok, mc:state()} | {error, Reason :: any()}.
message(XName, RoutingKey, Content, Anns) ->
message(XName, RoutingKey, Content, Anns,
rabbit_feature_flags:is_enabled(message_containers)).
@ -453,19 +454,23 @@ message(#resource{name = ExchangeNameBin}, RoutingKey,
{error, _} = Error ->
Error;
HeaderRoutes ->
mc:init(?MODULE,
rabbit_basic:strip_bcc_header(Content),
Anns#{routing_keys => [RoutingKey | HeaderRoutes],
exchange => ExchangeNameBin})
{ok, mc:init(?MODULE,
rabbit_basic:strip_bcc_header(Content),
Anns#{routing_keys => [RoutingKey | HeaderRoutes],
exchange => ExchangeNameBin})}
end;
message(#resource{} = XName, RoutingKey,
#content{} = Content, Anns, false) ->
{ok, Msg} = rabbit_basic:message(XName, RoutingKey, Content),
case Anns of
#{id := Id} ->
Msg#basic_message{id = Id};
_ ->
Msg
case rabbit_basic:message(XName, RoutingKey, Content) of
{ok, Msg} ->
case Anns of
#{id := Id} ->
{ok, Msg#basic_message{id = Id}};
_ ->
{ok, Msg}
end;
{error, _} = Error ->
Error
end.
from_basic_message(#basic_message{content = Content,
@ -478,7 +483,8 @@ from_basic_message(#basic_message{content = Content,
_ ->
#{id => Id}
end,
message(Ex, RKey, prepare(read, Content), Anns, true).
{ok, Msg} = message(Ex, RKey, prepare(read, Content), Anns, true),
Msg.
%% Internal

View File

@ -1289,7 +1289,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
DecodedContent) of
{error, Reason} ->
precondition_failed("invalid message: ~tp", [Reason]);
Message0 ->
{ok, Message0} ->
Message = rabbit_message_interceptor:intercept(Message0),
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
[rabbit_channel:deliver_reply(RK, Message) ||

View File

@ -158,7 +158,7 @@ trace(X, Msg0, RKPrefix, RKSuffix, Extra) ->
++ Extra},
properties_bin = none},
TargetXName = SourceXName#resource{name = ?XNAME},
TraceMsg = mc_amqpl:message(TargetXName, Key, Content),
{ok, TraceMsg} = mc_amqpl:message(TargetXName, Key, Content),
ok = rabbit_queue_type:publish_at_most_once(X, TraceMsg),
ok
end.

View File

@ -1609,7 +1609,7 @@ publish_and_confirm(Q, Payload, Count) ->
Payload),
Content = BMsg#basic_message.content,
Ex = BMsg#basic_message.exchange_name,
Msg = mc_amqpl:message(Ex, <<>>, Content),
{ok, Msg} = mc_amqpl:message(Ex, <<>>, Content),
Options = #{correlation => Seq},
{ok, Acc, _Actions} = rabbit_queue_type:deliver([Q], Msg,
Options, Acc0),
@ -1863,4 +1863,5 @@ message(IsPersistent, PayloadFun, N) ->
false -> 1
end},
PayloadFun(N)),
mc_amqpl:message(Ex, <<>>, Content, #{id => Id}).
{ok, Msg} = mc_amqpl:message(Ex, <<>>, Content, #{id => Id}),
Msg.

View File

@ -801,7 +801,7 @@ cmd_publish_msg(St=#cq{amq=AMQ}, PayloadSize, DeliveryMode, Mandatory, Expiratio
expiration = do_encode_expiration(Expiration)},
Payload),
Msg0 = mc_amqpl:message(Ex, <<>>, BasicMsg#basic_message.content),
{ok, Msg0} = mc_amqpl:message(Ex, <<>>, BasicMsg#basic_message.content),
Msg = mc:set_annotation(id, BasicMsg#basic_message.id, Msg0),
{ok, _, _} = rabbit_queue_type:deliver([AMQ], Msg, #{}, stateless),
Content = mc:protocol_state(Msg),

View File

@ -487,11 +487,11 @@ amqp_amqpl_amqp_bodies(_Config) ->
Ex = #resource{virtual_host = <<"/">>,
kind = exchange,
name = <<"ex">>},
LegacyMsg = mc_amqpl:message(Ex, <<"rkey">>,
#content{payload_fragments_rev =
lists:reverse(EncodedPayload),
properties = Props},
#{}, true),
{ok, LegacyMsg} = mc_amqpl:message(Ex, <<"rkey">>,
#content{payload_fragments_rev =
lists:reverse(EncodedPayload),
properties = Props},
#{}, true),
AmqpMsg = mc:convert(mc_amqp, LegacyMsg),
%% drop any non body sections

View File

@ -368,14 +368,14 @@ info_head_message_timestamp1(_Config) ->
Content1 = #content{properties = #'P_basic'{priority = 1,
timestamp = 1000},
payload_fragments_rev = []},
Msg1 = mc_amqpl:message(ExName, <<>>, Content1, #{id => <<"msg1">>}),
{ok, Msg1} = mc_amqpl:message(ExName, <<>>, Content1, #{id => <<"msg1">>}),
BQS2 = PQ:publish(Msg1, #message_properties{size = 0}, false, self(),
noflow, BQS1),
1000 = PQ:info(head_message_timestamp, BQS2),
%% Publish a higher priority message with no timestamp.
Content2 = #content{properties = #'P_basic'{priority = 2},
payload_fragments_rev = []},
Msg2 = mc_amqpl:message(ExName, <<>>, Content2, #{id => <<"msg2">>}),
{ok, Msg2} = mc_amqpl:message(ExName, <<>>, Content2, #{id => <<"msg2">>}),
BQS3 = PQ:publish(Msg2, #message_properties{size = 0}, false, self(),
noflow, BQS2),
'' = PQ:info(head_message_timestamp, BQS3),

View File

@ -196,9 +196,9 @@ test_topic_expect_match(X, List) ->
BinKey = list_to_binary(Key),
Message = rabbit_basic:message(X#exchange.name, BinKey,
#'P_basic'{}, <<>>),
Msg = mc_amqpl:message(X#exchange.name,
BinKey,
Message#basic_message.content),
{ok, Msg} = mc_amqpl:message(X#exchange.name,
BinKey,
Message#basic_message.content),
Res = rabbit_exchange_type_topic:route(X, Msg),
ExpectedRes = [rabbit_misc:r(?VHOST, queue, list_to_binary(Q)) ||
Q <- Expected],

View File

@ -66,7 +66,7 @@ append_to_acc(_Config) ->
payload_fragments_rev = [[<<"1234567890">>]] %% 10 bytes
},
ExName = rabbit_misc:r(<<>>, exchange, <<>>),
Msg = mc_amqpl:message(ExName, <<>>, Content, #{id => 1}, true),
{ok, Msg} = mc_amqpl:message(ExName, <<>>, Content, #{id => 1}, true),
BQDepth = 10,
SyncThroughput_0 = 0,
FoldAcc1 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput_0}, {0, BQDepth}, erlang:monotonic_time()},

View File

@ -87,7 +87,7 @@ handle_event(#event{type = Type,
TS, milli_seconds, seconds)},
Content = rabbit_basic:build_content(PBasic, <<>>),
XName = exchange(VHost),
Msg = mc_amqpl:message(XName, Key, Content),
{ok, Msg} = mc_amqpl:message(XName, Key, Content),
rabbit_queue_type:publish_at_most_once(XName, Msg)
end,
{ok, State};

View File

@ -2541,6 +2541,6 @@ compat(McMqtt, #state{cfg = #cfg{exchange = XName}}) ->
[RoutingKey] = mc:get_annotation(routing_keys, McMqtt),
McLegacy = mc:convert(mc_amqpl, McMqtt),
Content = mc:protocol_state(McLegacy),
BasicMsg = mc_amqpl:message(XName, RoutingKey, Content, #{}, FFState),
{ok, BasicMsg} = mc_amqpl:message(XName, RoutingKey, Content, #{}, FFState),
rabbit_basic:add_header(<<"x-mqtt-publish-qos">>, byte, Qos, BasicMsg)
end.

View File

@ -431,9 +431,9 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
Res = try
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
Content = #content{properties = #'P_basic'{}},
DummyMsg = mc_amqpl:message(ExchangeName,
RoutingKey,
Content),
{ok, DummyMsg} = mc_amqpl:message(ExchangeName,
RoutingKey,
Content),
case rabbit_exchange:route(Exchange, DummyMsg) of
[] ->
{ok, no_route};