I thought of another race: in theory a queue_stats message could make it out of a dying node just in time, but then be overtaken by the queue_deleted event that was generated by another node. Fairly unlikely, but we should handle it.

This commit is contained in:
Simon MacMullen 2013-02-22 13:54:02 +00:00
parent 1a9cc3ba9b
commit 914f1fa751
1 changed files with 23 additions and 8 deletions

View File

@ -424,7 +424,7 @@ handle_event(Event = #event{type = queue_deleted,
Id = {coarse, {queue_stats, Name}},
TS = floor(Timestamp, State),
OldStats = lookup_element(OldTable, Id),
[record_sample(Id, {Key, -pget(Key, OldStats, 0), TS, State}, State)
[record_sample_coarse(Id, {Key, -pget(Key, OldStats, 0), TS, State}, State)
|| Key <- ?COARSE_QUEUE_STATS],
delete_samples(channel_queue_stats, {'_', Name}, State),
delete_samples(queue_exchange_stats, {Name, '_'}, State),
@ -606,9 +606,15 @@ append_sample(Key, Value, NewMS, OldStats, Id, State) ->
record_sample(
Id, {Key, Value - pget(Key, OldStats, 0), NewMS, State}, State).
record_sample({coarse, {queue_stats, Q} = Id}, Args, State) ->
case object_exists(Q, State) of
true -> record_sample_coarse({coarse, Id}, Args, State);
false -> io:format("Ignoring: ~p~n", [{Q, Args}]),
ok
end;
record_sample({coarse, Id}, Args, State) ->
record_sample0(Id, Args),
record_sample0({vhost_stats, vhost(Id, State)}, Args);
record_sample_coarse({coarse, Id}, Args, State);
%% Deliveries / acks (Q -> Ch)
record_sample({fine, {Ch, Q = #resource{kind = queue}}}, Args, State) ->
@ -655,11 +661,16 @@ record_sample({fine, {_Ch,
false -> ok
end.
%% We have to check the queue and exchange objects still exist for fine
%% stats since their deleted event could be overtaken by a channel stats
%% event which contains fine stats referencing them. That's also why we
%% don't need to check the channels exist - their deleted event can't be
%% overtaken by their own last stats event.
%% We have to check the queue and exchange objects still exist since
%% their deleted event could be overtaken by a channel stats event
%% which contains fine stats referencing them. That's also why we
%% don't need to check the channels exist - their deleted event can't
%% be overtaken by their own last stats event.
%%
%% Also, sometimes the queue_deleted event is not emitted by the queue
%% (in the nodedown case) - so it can overtake the final queue_stats
%% event (which is not *guaranteed* to be lost). So we make a similar
%% check for coarse queue stats.
%%
%% We can be sure that mnesia will be up to date by the time we receive
%% the event (even though we dirty read) since the deletions are
@ -685,6 +696,10 @@ record_sampleX(RenamePublishTo, X, {publish, Diff, TS, State}) ->
record_sampleX(_RenamePublishTo, X, {Type, Diff, TS, State}) ->
record_sample0({exchange_stats, X}, {Type, Diff, TS, State}).
record_sample_coarse({coarse, Id}, Args, State) ->
record_sample0(Id, Args),
record_sample0({vhost_stats, vhost(Id, State)}, Args).
record_sample0(Id0, {Key, Diff, TS, #state{aggregated_stats = ETS,
aggregated_stats_index = ETSi}}) ->
Id = {Id0, Key},