Merge pull request #10186 from rabbitmq/message-received-at-timestamp
Timestamp (annotate with a timestamp) all messages automatically
This commit is contained in:
commit
d57fde54c5
|
|
@ -148,7 +148,7 @@ init(Proto, Data, Anns0, Env)
|
|||
end,
|
||||
#?MODULE{protocol = Proto,
|
||||
data = ProtoData,
|
||||
annotations = maps:merge(ProtoAnns, Anns)}.
|
||||
annotations = set_received_at_timestamp(maps:merge(ProtoAnns, Anns))}.
|
||||
|
||||
-spec size(state()) ->
|
||||
{MetadataSize :: non_neg_integer(),
|
||||
|
|
@ -425,6 +425,10 @@ is_cycle(Queue, [{Queue, Reason} | _])
|
|||
is_cycle(Queue, [_ | Rem]) ->
|
||||
is_cycle(Queue, Rem).
|
||||
|
||||
set_received_at_timestamp(Anns) ->
|
||||
Millis = os:system_time(millisecond),
|
||||
maps:put(rts, Millis, Anns).
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-endif.
|
||||
|
|
|
|||
|
|
@ -413,6 +413,8 @@ info(backing_queue_status, #state{bq = BQ, bqss = BQSs}) ->
|
|||
end, nothing, BQSs);
|
||||
info(head_message_timestamp, #state{bq = BQ, bqss = BQSs}) ->
|
||||
find_head_message_timestamp(BQ, BQSs, '');
|
||||
info(oldest_message_received_timestamp, #state{bq = BQ, bqss = BQSs}) ->
|
||||
find_oldest_message_received_timestamp(BQ, BQSs);
|
||||
info(online, _) ->
|
||||
'';
|
||||
info(Item, #state{bq = BQ, bqss = BQSs}) ->
|
||||
|
|
@ -689,6 +691,28 @@ find_head_message_timestamp(BQ, [{_, BQSN} | Rest], Timestamp) ->
|
|||
find_head_message_timestamp(_, [], Timestamp) ->
|
||||
Timestamp.
|
||||
|
||||
find_oldest_message_received_timestamp(BQ, BQs) ->
|
||||
%% Oldest message timestamp among all priority queues
|
||||
Timestamps =
|
||||
lists:foldl(
|
||||
fun({_, BQSN}, Acc) ->
|
||||
case oldest_message_received_timestamp(BQ, BQSN) of
|
||||
'' -> Acc;
|
||||
Ts -> [Ts | Acc]
|
||||
end
|
||||
end, [], BQs),
|
||||
case Timestamps of
|
||||
[] -> '';
|
||||
_ -> lists:min(Timestamps)
|
||||
end.
|
||||
|
||||
oldest_message_received_timestamp(BQ, BQSN) ->
|
||||
MsgCount = BQ:len(BQSN) + BQ:info(messages_unacknowledged_ram, BQSN),
|
||||
if
|
||||
MsgCount =/= 0 -> BQ:info(oldest_message_received_timestamp, BQSN);
|
||||
true -> ''
|
||||
end.
|
||||
|
||||
zip_msgs_and_acks(Pubs, AckTags) ->
|
||||
lists:zipwith(
|
||||
fun ({Msg, _Props}, AckTag) ->
|
||||
|
|
|
|||
|
|
@ -833,6 +833,10 @@ info(head_message_timestamp, #vqstate{
|
|||
q3 = Q3,
|
||||
ram_pending_ack = RPA}) ->
|
||||
head_message_timestamp(Q3, RPA);
|
||||
info(oldest_message_received_timestamp, #vqstate{
|
||||
q3 = Q3,
|
||||
ram_pending_ack = RPA}) ->
|
||||
oldest_message_received_timestamp(Q3, RPA);
|
||||
info(disk_reads, #vqstate{disk_read_count = Count}) ->
|
||||
Count;
|
||||
info(disk_writes, #vqstate{disk_write_count = Count}) ->
|
||||
|
|
@ -1194,7 +1198,6 @@ convert_from_v2_to_v1_loop(QueueName, V1Index0, V2Index0, V2Store0,
|
|||
%% regarded as unprocessed until acked, this also prevents the result
|
||||
%% apparently oscillating during repeated rejects.
|
||||
%%
|
||||
%% @todo OK I think we can do this differently
|
||||
head_message_timestamp(Q3, RPA) ->
|
||||
HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
|
||||
HeadMsgStatus <-
|
||||
|
|
@ -1215,6 +1218,26 @@ head_message_timestamp(Q3, RPA) ->
|
|||
false -> lists:min(Timestamps)
|
||||
end.
|
||||
|
||||
oldest_message_received_timestamp(Q3, RPA) ->
|
||||
HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
|
||||
HeadMsgStatus <-
|
||||
[ get_q_head(Q3),
|
||||
get_pa_head(RPA) ],
|
||||
HeadMsgStatus /= undefined,
|
||||
HeadMsgStatus#msg_status.msg /= undefined ],
|
||||
|
||||
Timestamps =
|
||||
[Timestamp
|
||||
|| HeadMsg <- HeadMsgs,
|
||||
Timestamp <- [mc:get_annotation(rts, HeadMsg)],
|
||||
Timestamp /= undefined
|
||||
],
|
||||
|
||||
case Timestamps == [] of
|
||||
true -> '';
|
||||
false -> lists:min(Timestamps)
|
||||
end.
|
||||
|
||||
get_q_head(Q) ->
|
||||
?QUEUE:get(Q, undefined).
|
||||
|
||||
|
|
|
|||
|
|
@ -294,7 +294,7 @@ amqpl_amqp_bin_amqpl(_Config) ->
|
|||
?assertEqual({utf8, <<"msg-id">>}, mc:message_id(Msg)),
|
||||
?assertEqual(1, mc:ttl(Msg)),
|
||||
?assertEqual({utf8, <<"apple">>}, mc:x_header(<<"x-stream-filter">>, Msg)),
|
||||
|
||||
?assert(is_integer(mc:get_annotation(rts, Msg))),
|
||||
|
||||
%% array type non x-headers cannot be converted into amqp
|
||||
RoutingHeaders = maps:remove(<<"a-array">>, mc:routing_headers(Msg, [])),
|
||||
|
|
@ -316,6 +316,7 @@ amqpl_amqp_bin_amqpl(_Config) ->
|
|||
%% at this point the type is now present as a message annotation
|
||||
?assertEqual({utf8, <<"45">>}, mc:x_header(<<"x-basic-type">>, Msg10)),
|
||||
?assertEqual(RoutingHeaders, mc:routing_headers(Msg10, [])),
|
||||
?assert(is_integer(mc:get_annotation(rts, Msg10))),
|
||||
|
||||
[
|
||||
#'v1_0.header'{} = Hdr10,
|
||||
|
|
@ -370,6 +371,7 @@ amqpl_amqp_bin_amqpl(_Config) ->
|
|||
?assertEqual(1, mc:ttl(MsgL2)),
|
||||
?assertEqual({utf8, <<"apple">>}, mc:x_header(<<"x-stream-filter">>, MsgL2)),
|
||||
?assertEqual(RoutingHeaders, mc:routing_headers(MsgL2, [])),
|
||||
?assert(is_integer(mc:get_annotation(rts, MsgL2))),
|
||||
ok.
|
||||
|
||||
amqpl_cc_amqp_bin_amqpl(_Config) ->
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ groups() ->
|
|||
dropwhile_fetchwhile,
|
||||
info_head_message_timestamp,
|
||||
info_backing_queue_version,
|
||||
info_oldest_message_received_timestamp,
|
||||
unknown_info_key,
|
||||
matching,
|
||||
purge,
|
||||
|
|
@ -415,6 +416,53 @@ info_backing_queue_version(Config) ->
|
|||
passed
|
||||
end.
|
||||
|
||||
info_oldest_message_received_timestamp(Config) ->
|
||||
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
?MODULE, info_oldest_message_received_timestamp1, [Config]).
|
||||
|
||||
info_oldest_message_received_timestamp1(_Config) ->
|
||||
QName = rabbit_misc:r(<<"/">>, queue,
|
||||
<<"info_oldest_message_received_timestamp-queue">>),
|
||||
ExName = rabbit_misc:r(<<"/">>, exchange, <<>>),
|
||||
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
|
||||
Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]),
|
||||
PQ = rabbit_priority_queue,
|
||||
BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
|
||||
%% The queue is empty: no timestamp.
|
||||
true = PQ:is_empty(BQS1),
|
||||
'' = PQ:info(oldest_message_received_timestamp, BQS1),
|
||||
%% Publish one message.
|
||||
Content1 = #content{properties = #'P_basic'{priority = 1},
|
||||
payload_fragments_rev = []},
|
||||
{ok, Msg1} = mc_amqpl:message(ExName, <<>>, Content1, #{id => <<"msg1">>}),
|
||||
BQS2 = PQ:publish(Msg1, #message_properties{size = 0}, false, self(),
|
||||
noflow, BQS1),
|
||||
Ts1 = PQ:info(oldest_message_received_timestamp, BQS2),
|
||||
?assert(is_integer(Ts1)),
|
||||
%% Publish a higher priority message.
|
||||
Content2 = #content{properties = #'P_basic'{priority = 2},
|
||||
payload_fragments_rev = []},
|
||||
{ok, Msg2} = mc_amqpl:message(ExName, <<>>, Content2, #{id => <<"msg2">>}),
|
||||
BQS3 = PQ:publish(Msg2, #message_properties{size = 0}, false, self(),
|
||||
noflow, BQS2),
|
||||
%% Even though is highest priority, the lower priority message is older.
|
||||
%% Timestamp hasn't changed.
|
||||
?assertEqual(Ts1, PQ:info(oldest_message_received_timestamp, BQS3)),
|
||||
%% Consume message.
|
||||
{{Msg2, _, _}, BQS4} = PQ:fetch(false, BQS3),
|
||||
?assertEqual(Ts1, PQ:info(oldest_message_received_timestamp, BQS4)),
|
||||
%% Consume the first message, but do not acknowledge it
|
||||
%% yet. The goal is to verify that the unacknowledged message's
|
||||
%% timestamp is returned.
|
||||
{{Msg1, _, AckTag}, BQS5} = PQ:fetch(true, BQS4),
|
||||
?assertEqual(Ts1, PQ:info(oldest_message_received_timestamp, BQS5)),
|
||||
%% Ack message. The queue is empty now.
|
||||
{[<<"msg1">>], BQS6} = PQ:ack([AckTag], BQS5),
|
||||
true = PQ:is_empty(BQS6),
|
||||
?assertEqual('', PQ:info(oldest_message_received_timestamp, BQS6)),
|
||||
PQ:delete_and_terminate(a_whim, BQS6),
|
||||
passed.
|
||||
|
||||
unknown_info_key(Config) ->
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||
Q = <<"info-priority-queue">>,
|
||||
|
|
|
|||
Loading…
Reference in New Issue