Delay generating the basic_message id (guid)
until we know if it is needed or not.
This commit is contained in:
parent
105f017e85
commit
ead68d2079
|
|
@ -10,7 +10,7 @@
|
||||||
-include_lib("rabbit_common/include/rabbit_framing.hrl").
|
-include_lib("rabbit_common/include/rabbit_framing.hrl").
|
||||||
|
|
||||||
-export([publish/4, publish/5, publish/1,
|
-export([publish/4, publish/5, publish/1,
|
||||||
message/3, message/4, properties/1, prepend_table_header/3,
|
message/3, message_no_id/3, message/4, properties/1, prepend_table_header/3,
|
||||||
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
|
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
|
||||||
header_routes/1, parse_expiration/1, header/2, header/3]).
|
header_routes/1, parse_expiration/1, header/2, header/3]).
|
||||||
-export([build_content/2, from_content/1, msg_size/1,
|
-export([build_content/2, from_content/1, msg_size/1,
|
||||||
|
|
@ -130,19 +130,17 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
|
||||||
(rabbit_exchange:name(), rabbit_router:routing_key(),
|
(rabbit_exchange:name(), rabbit_router:routing_key(),
|
||||||
rabbit_types:decoded_content()) ->
|
rabbit_types:decoded_content()) ->
|
||||||
rabbit_types:ok_or_error2(rabbit_types:message(), any()).
|
rabbit_types:ok_or_error2(rabbit_types:message(), any()).
|
||||||
|
message(XName, RoutingKey, Content) ->
|
||||||
|
make_message(XName, RoutingKey, Content, rabbit_guid:gen()).
|
||||||
|
|
||||||
message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
|
%% only used by channel to avoid unnecessarily generating a guid when
|
||||||
try
|
%% queue types do not need them
|
||||||
{ok, #basic_message{
|
-spec message_no_id
|
||||||
exchange_name = XName,
|
(rabbit_exchange:name(), rabbit_router:routing_key(),
|
||||||
content = strip_header(DecodedContent, ?DELETED_HEADER),
|
rabbit_types:decoded_content()) ->
|
||||||
id = rabbit_guid:gen(),
|
rabbit_types:ok_or_error2(rabbit_types:message(), any()).
|
||||||
is_persistent = is_message_persistent(DecodedContent),
|
message_no_id(XName, RoutingKey, Content) ->
|
||||||
routing_keys = [RoutingKey |
|
make_message(XName, RoutingKey, Content, <<>>).
|
||||||
header_routes(Props#'P_basic'.headers)]}}
|
|
||||||
catch
|
|
||||||
{error, _Reason} = Error -> Error
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec message
|
-spec message
|
||||||
(rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(),
|
(rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(),
|
||||||
|
|
@ -352,3 +350,17 @@ header_key(A) ->
|
||||||
|
|
||||||
binary_prefix_64(Bin, Len) ->
|
binary_prefix_64(Bin, Len) ->
|
||||||
binary:part(Bin, 0, min(byte_size(Bin), Len)).
|
binary:part(Bin, 0, min(byte_size(Bin), Len)).
|
||||||
|
|
||||||
|
make_message(XName, RoutingKey, #content{properties = Props} = DecodedContent, Guid) ->
|
||||||
|
try
|
||||||
|
{ok, #basic_message{
|
||||||
|
exchange_name = XName,
|
||||||
|
content = strip_header(DecodedContent, ?DELETED_HEADER),
|
||||||
|
id = Guid,
|
||||||
|
is_persistent = is_message_persistent(DecodedContent),
|
||||||
|
routing_keys = [RoutingKey |
|
||||||
|
header_routes(Props#'P_basic'.headers)]}}
|
||||||
|
catch
|
||||||
|
{error, _Reason} = Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1315,7 +1315,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
|
||||||
SeqNo = State0#ch.publish_seqno,
|
SeqNo = State0#ch.publish_seqno,
|
||||||
{SeqNo, State0#ch{publish_seqno = SeqNo + 1}}
|
{SeqNo, State0#ch{publish_seqno = SeqNo + 1}}
|
||||||
end,
|
end,
|
||||||
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
|
case rabbit_basic:message_no_id(ExchangeName, RoutingKey, DecodedContent) of
|
||||||
{ok, Message} ->
|
{ok, Message} ->
|
||||||
Delivery = rabbit_basic:delivery(
|
Delivery = rabbit_basic:delivery(
|
||||||
Mandatory, DoConfirm, Message, MsgSeqNo),
|
Mandatory, DoConfirm, Message, MsgSeqNo),
|
||||||
|
|
|
||||||
|
|
@ -7,11 +7,12 @@
|
||||||
-record(msg_status, {pending :: [pid()],
|
-record(msg_status, {pending :: [pid()],
|
||||||
confirmed = [] :: [pid()]}).
|
confirmed = [] :: [pid()]}).
|
||||||
|
|
||||||
-record(?MODULE, {pid :: undefined | pid(), %% the current master pid
|
|
||||||
qref :: term(), %% TODO
|
|
||||||
unconfirmed = #{} ::
|
|
||||||
#{non_neg_integer() => #msg_status{}}}).
|
|
||||||
-define(STATE, ?MODULE).
|
-define(STATE, ?MODULE).
|
||||||
|
-record(?STATE, {pid :: undefined | pid(), %% the current master pid
|
||||||
|
qref :: term(), %% TODO
|
||||||
|
unconfirmed = #{} ::
|
||||||
|
#{non_neg_integer() => #msg_status{}}}).
|
||||||
|
|
||||||
|
|
||||||
-opaque state() :: #?STATE{}.
|
-opaque state() :: #?STATE{}.
|
||||||
|
|
||||||
|
|
@ -300,9 +301,13 @@ settlement_action(Type, QRef, MsgSeqs, Acc) ->
|
||||||
{[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}.
|
{[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}.
|
||||||
deliver(Qs0, #delivery{flow = Flow,
|
deliver(Qs0, #delivery{flow = Flow,
|
||||||
msg_seq_no = MsgNo,
|
msg_seq_no = MsgNo,
|
||||||
message = #basic_message{exchange_name = _Ex},
|
message = #basic_message{} = Msg0,
|
||||||
confirm = Confirm} = Delivery) ->
|
confirm = Confirm} = Delivery0) ->
|
||||||
%% TODO: record master and slaves for confirm processing
|
%% add guid to content here instead of in rabbit_basic:message/3,
|
||||||
|
%% as classic queues are the only ones that need it
|
||||||
|
Msg = Msg0#basic_message{id = rabbit_guid:gen()},
|
||||||
|
Delivery = Delivery0#delivery{message = Msg},
|
||||||
|
|
||||||
{MPids, SPids, Qs, Actions} = qpids(Qs0, Confirm, MsgNo),
|
{MPids, SPids, Qs, Actions} = qpids(Qs0, Confirm, MsgNo),
|
||||||
QPids = MPids ++ SPids,
|
QPids = MPids ++ SPids,
|
||||||
case Flow of
|
case Flow of
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue