Merge pull request #3229 from rabbitmq/stream-status-tracking

Fix function_clause error in tracking_status/2
This commit is contained in:
Michael Klishin 2021-07-23 21:08:57 +03:00 committed by GitHub
commit 23432f3130
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 5 deletions

View File

@ -605,10 +605,14 @@ tracking_status(Vhost, QueueName) ->
{ok, Q} when ?amqqueue_is_stream(Q) ->
Leader = amqqueue:get_pid(Q),
Map = osiris:read_tracking(Leader),
maps:fold(fun(K, {Type, Value}, Acc) ->
[[{reference, K},
{type, Type},
{value, Value}] | Acc]
maps:fold(fun(Type, Trackings, Acc) ->
%% Convert for example 'offsets' to 'offset' or 'sequences' to 'sequence'
T = list_to_atom(lists:droplast(atom_to_list(Type))),
maps:fold(fun(TrkId, TrkData, Acc0) ->
[[{type, T},
{reference, TrkId},
{value, TrkData}] | Acc0]
end, [], Trackings) ++ Acc
end, [], Map);
{error, not_found} = E->
E

View File

@ -101,7 +101,8 @@ all_tests() ->
max_segment_size_bytes_policy,
purge,
update_retention_policy,
queue_info
queue_info,
tracking_status
].
%% -------------------------------------------------------------------
@ -1044,6 +1045,25 @@ consume_and_ack(Config) ->
end,
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
tracking_status(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
Vhost = ?config(rmq_vhost, Config),
?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_stream_queue, ?FUNCTION_NAME, [Vhost, Q])),
publish_confirm(Ch, Q, [<<"msg">>]),
?assertMatch([[
{type, sequence},
{reference, _WriterID},
{value, {_Offset = 0, _Seq = 1}}
]],
rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_stream_queue, ?FUNCTION_NAME, [Vhost, Q])),
rabbit_ct_broker_helpers:rpc(Config, Server, ?MODULE, delete_testcase_queue, [Q]).
consume_from_last(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),