Fix race condition where vhost_msg_stats were not updated after queue delete

This commit is contained in:
kjnilsson 2016-11-30 12:38:03 +00:00
parent b1a4a2e6a1
commit 2777343d2d
2 changed files with 56 additions and 39 deletions

View File

@ -144,24 +144,37 @@ take_smaller(Policies) ->
insert_old_aggr_stats(NextStats, Id, Stat) -> insert_old_aggr_stats(NextStats, Id, Stat) ->
dict:store(Id, Stat, NextStats). dict:store(Id, Stat, NextStats).
aggregate_metrics(Timestamp, State) -> handle_deleted_queues(queue_coarse_metrics, Remainders, GPolicies) ->
Table = State#state.table, TS = exometer_slide:timestamp(),
Res = ets:foldl( lists:foreach(fun ({Queue, {R, U, M}}) ->
fun(R, Dict) -> NegStats = ?vhost_msg_stats(-R, -U, -M),
aggregate_entry(Timestamp, R, Dict, State) [insert_entry(vhost_msg_stats, vhost(Queue), TS,
end, dict:new(), Table), NegStats, Size, Interval, true)
State#state{old_aggr_stats = Res}. || {Size, Interval} <- GPolicies]
end,
dict:to_list(Remainders));
handle_deleted_queues(_T, _R, _P) -> ok.
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_created}) -> aggregate_metrics(Timestamp, #state{table = Table,
policies = {_, _, GPolicies}} = State0) ->
Table = State0#state.table,
{Next, #state{old_aggr_stats = Remainders}} = ets:foldl(
fun(R, {Dict, State}) ->
aggregate_entry(Timestamp, R, Dict, State)
end, {dict:new(), State0}, Table),
handle_deleted_queues(Table, Remainders, GPolicies),
State0#state{old_aggr_stats = Next}.
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_created} = State) ->
Ftd = rabbit_mgmt_format:format( Ftd = rabbit_mgmt_format:format(
Metrics, Metrics,
{fun rabbit_mgmt_format:format_connection_created/1, true}), {fun rabbit_mgmt_format:format_connection_created/1, true}),
ets:insert(connection_created_stats, ets:insert(connection_created_stats,
?connection_created_stats(Id, pget(name, Ftd, unknown), Ftd)), ?connection_created_stats(Id, pget(name, Ftd, unknown), Ftd)),
NextStats; {NextStats, State};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_metrics}) -> aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_metrics} = State) ->
ets:insert(connection_stats, ?connection_stats(Id, Metrics)), ets:insert(connection_stats, ?connection_stats(Id, Metrics)),
NextStats; {NextStats, State};
aggregate_entry(TS, {Id, RecvOct, SendOct, Reductions}, NextStats, aggregate_entry(TS, {Id, RecvOct, SendOct, Reductions}, NextStats,
#state{table = connection_coarse_metrics, #state{table = connection_coarse_metrics,
policies = {BPolicies, _, GPolicies}} = State) -> policies = {BPolicies, _, GPolicies}} = State) ->
@ -174,17 +187,17 @@ aggregate_entry(TS, {Id, RecvOct, SendOct, Reductions}, NextStats,
?connection_stats_coarse_conn_stats(RecvOct, SendOct, Reductions), ?connection_stats_coarse_conn_stats(RecvOct, SendOct, Reductions),
Size, Interval, false) Size, Interval, false)
end || {Size, Interval} <- BPolicies], end || {Size, Interval} <- BPolicies],
insert_old_aggr_stats(NextStats, Id, Stats); {insert_old_aggr_stats(NextStats, Id, Stats), State};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_created}) -> aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_created} = State) ->
Ftd = rabbit_mgmt_format:format(Metrics, {[], false}), Ftd = rabbit_mgmt_format:format(Metrics, {[], false}),
ets:insert(channel_created_stats, ets:insert(channel_created_stats,
?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd)), ?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd)),
NextStats; {NextStats, State};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_metrics}) -> aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_metrics} = State) ->
Ftd = rabbit_mgmt_format:format(Metrics, Ftd = rabbit_mgmt_format:format(Metrics,
{fun rabbit_mgmt_format:format_channel_stats/1, true}), {fun rabbit_mgmt_format:format_channel_stats/1, true}),
ets:insert(channel_stats, ?channel_stats(Id, Ftd)), ets:insert(channel_stats, ?channel_stats(Id, Ftd)),
NextStats; {NextStats, State};
aggregate_entry(TS, {{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable}, NextStats, aggregate_entry(TS, {{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable}, NextStats,
#state{table = channel_exchange_metrics, #state{table = channel_exchange_metrics,
policies = {BPolicies, DPolicies, GPolicies}, policies = {BPolicies, DPolicies, GPolicies},
@ -215,7 +228,7 @@ aggregate_entry(TS, {{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable}, NextSta
_ -> _ ->
ok ok
end, end,
insert_old_aggr_stats(NextStats, Id, Stats); {insert_old_aggr_stats(NextStats, Id, Stats), State};
aggregate_entry(TS, {{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, Redeliver, Ack}, aggregate_entry(TS, {{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, Redeliver, Ack},
NextStats, NextStats,
#state{table = channel_queue_metrics, #state{table = channel_queue_metrics,
@ -247,7 +260,7 @@ aggregate_entry(TS, {{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, Redeliv
_ -> _ ->
ok ok
end, end,
insert_old_aggr_stats(NextStats, Id, Stats); {insert_old_aggr_stats(NextStats, Id, Stats), State};
aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish}, NextStats, aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish}, NextStats,
#state{table = channel_queue_exchange_metrics, #state{table = channel_queue_exchange_metrics,
policies = {BPolicies, _, _}, policies = {BPolicies, _, _},
@ -277,22 +290,23 @@ aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish}, NextStats,
_ -> _ ->
ok ok
end, end,
insert_old_aggr_stats(NextStats, Id, Stats); {insert_old_aggr_stats(NextStats, Id, Stats), State};
aggregate_entry(TS, {Id, Reductions}, NextStats, #state{table = channel_process_metrics, aggregate_entry(TS, {Id, Reductions}, NextStats,
policies = {BPolicies, _, _}}) -> #state{table = channel_process_metrics,
policies = {BPolicies, _, _}} = State) ->
[begin [begin
insert_entry(channel_process_stats, Id, TS, ?channel_process_stats(Reductions), insert_entry(channel_process_stats, Id, TS, ?channel_process_stats(Reductions),
Size, Interval, false) Size, Interval, false)
end || {Size, Interval} <- BPolicies], end || {Size, Interval} <- BPolicies],
NextStats; {NextStats, State};
aggregate_entry(_TS, {Id, Exclusive, AckRequired, PrefetchCount, Args}, NextStats, aggregate_entry(_TS, {Id, Exclusive, AckRequired, PrefetchCount, Args}, NextStats,
#state{table = consumer_created}) -> #state{table = consumer_created} = State) ->
Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive}, Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive},
{ack_required, AckRequired}, {ack_required, AckRequired},
{prefetch_count, PrefetchCount}, {prefetch_count, PrefetchCount},
{arguments, Args}], {[], false}), {arguments, Args}], {[], false}),
insert_with_index(consumer_stats, Id, ?consumer_stats(Id, Fmt)), insert_with_index(consumer_stats, Id, ?consumer_stats(Id, Fmt)),
NextStats; {NextStats, State};
aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = queue_metrics, aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = queue_metrics,
policies = {BPolicies, _, GPolicies}, policies = {BPolicies, _, GPolicies},
lookup_queue = QueueFun} = State) -> lookup_queue = QueueFun} = State) ->
@ -311,9 +325,10 @@ aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = queue_metrics,
false -> false ->
ok ok
end, end,
insert_old_aggr_stats(NextStats, Id, Stats); {insert_old_aggr_stats(NextStats, Id, Stats), State};
aggregate_entry(TS, {Name, Ready, Unack, Msgs, Red}, NextStats, aggregate_entry(TS, {Name, Ready, Unack, Msgs, Red}, NextStats,
#state{table = queue_coarse_metrics, #state{table = queue_coarse_metrics,
old_aggr_stats = Old,
policies = {BPolicies, _, GPolicies}, policies = {BPolicies, _, GPolicies},
lookup_queue = QueueFun} = State) -> lookup_queue = QueueFun} = State) ->
Stats = ?vhost_msg_stats(Ready, Unack, Msgs), Stats = ?vhost_msg_stats(Ready, Unack, Msgs),
@ -331,12 +346,14 @@ aggregate_entry(TS, {Name, Ready, Unack, Msgs, Red}, NextStats,
_ -> _ ->
ok ok
end, end,
insert_old_aggr_stats(NextStats, Name, Stats); State1 = State#state{old_aggr_stats = dict:erase(Name, Old)},
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = node_metrics}) -> {insert_old_aggr_stats(NextStats, Name, Stats), State1};
aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = node_metrics} = State) ->
ets:insert(node_stats, {Id, Metrics}), ets:insert(node_stats, {Id, Metrics}),
NextStats; {NextStats, State};
aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_coarse_metrics, aggregate_entry(TS, {Id, Metrics}, NextStats,
policies = {_, _, GPolicies}}) -> #state{table = node_coarse_metrics,
policies = {_, _, GPolicies}} = State) ->
Stats = ?node_coarse_stats( Stats = ?node_coarse_stats(
pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0), pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0),
pget(mem_used, Metrics, 0), pget(disk_free, Metrics, 0), pget(mem_used, Metrics, 0), pget(disk_free, Metrics, 0),
@ -344,9 +361,10 @@ aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_coarse_metrics
pget(gc_bytes_reclaimed, Metrics, 0), pget(context_switches, Metrics, 0)), pget(gc_bytes_reclaimed, Metrics, 0), pget(context_switches, Metrics, 0)),
[insert_entry(node_coarse_stats, Id, TS, Stats, Size, Interval, false) [insert_entry(node_coarse_stats, Id, TS, Stats, Size, Interval, false)
|| {Size, Interval} <- GPolicies], || {Size, Interval} <- GPolicies],
NextStats; {NextStats, State};
aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_persister_metrics, aggregate_entry(TS, {Id, Metrics}, NextStats,
policies = {_, _, GPolicies}}) -> #state{table = node_persister_metrics,
policies = {_, _, GPolicies}} = State) ->
Stats = ?node_persister_stats( Stats = ?node_persister_stats(
pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0), pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0),
pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0), pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0),
@ -362,15 +380,16 @@ aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_persister_metr
pget(io_file_handle_open_attempt_time, Metrics, 0)), pget(io_file_handle_open_attempt_time, Metrics, 0)),
[insert_entry(node_persister_stats, Id, TS, Stats, Size, Interval, false) [insert_entry(node_persister_stats, Id, TS, Stats, Size, Interval, false)
|| {Size, Interval} <- GPolicies], || {Size, Interval} <- GPolicies],
NextStats; {NextStats, State};
aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_node_metrics, aggregate_entry(TS, {Id, Metrics}, NextStats,
policies = {_, _, GPolicies}}) -> #state{table = node_node_metrics,
policies = {_, _, GPolicies}} = State) ->
Stats = ?node_node_coarse_stats(pget(send_bytes, Metrics, 0), pget(recv_bytes, Metrics, 0)), Stats = ?node_node_coarse_stats(pget(send_bytes, Metrics, 0), pget(recv_bytes, Metrics, 0)),
CleanMetrics = lists:keydelete(recv_bytes, 1, lists:keydelete(send_bytes, 1, Metrics)), CleanMetrics = lists:keydelete(recv_bytes, 1, lists:keydelete(send_bytes, 1, Metrics)),
ets:insert(node_node_stats, ?node_node_stats(Id, CleanMetrics)), ets:insert(node_node_stats, ?node_node_stats(Id, CleanMetrics)),
[insert_entry(node_node_coarse_stats, Id, TS, Stats, Size, Interval, false) [insert_entry(node_node_coarse_stats, Id, TS, Stats, Size, Interval, false)
|| {Size, Interval} <- GPolicies], || {Size, Interval} <- GPolicies],
NextStats. {NextStats, State}.
insert_entry(Table, Id, TS, Entry, Size, Interval, Incremental) -> insert_entry(Table, Id, TS, Entry, Size, Interval, Incremental) ->
Key = {Id, Interval}, Key = {Id, Interval},

View File

@ -123,8 +123,6 @@ remove_queue(Name, BIntervals) ->
delete_samples(queue_process_stats, Name, BIntervals), delete_samples(queue_process_stats, Name, BIntervals),
delete_samples(queue_msg_stats, Name, BIntervals), delete_samples(queue_msg_stats, Name, BIntervals),
delete_samples(queue_msg_rates, Name, BIntervals), delete_samples(queue_msg_rates, Name, BIntervals),
%% vhost message counts must be updated with the deletion of the messages in this queue
rabbit_mgmt_metrics_collector:delete_queue(queue_coarse_metrics, Name),
index_delete(channel_queue_stats_deliver_stats, queue, Name), index_delete(channel_queue_stats_deliver_stats, queue, Name),
index_delete(queue_exchange_stats_publish, queue, Name), index_delete(queue_exchange_stats_publish, queue, Name),
index_delete(consumer_stats, queue, Name), index_delete(consumer_stats, queue, Name),