Code to clean up queue process state when crashed
* Call `Mod:format_state/1` if exported to possibly truncate huge states * Add more information about truncated ram_pending_ack and disk_pending_ack * Add `log.error_logger_format_depth` cuttlefish schema value * Add `format_state/1` to `rabbit_channel` * Add `log.summarize_process_state`, default is `false`, to enable summarizing process state for crash logs. * Added `format_state` to `rabbit_classic_queue_index_v2` and `rabbit_classic_queue_store_v2` * Ensure `rabbit_channel:format_state/1` uses `summarize_process_state_when_logged` * Do not set `summarize_process_state_when_logged` value by default.
This commit is contained in:
parent
e0bbd50322
commit
29e39c888b
|
@ -1674,6 +1674,24 @@ end}.
|
||||||
% Logging section
|
% Logging section
|
||||||
% ==========================
|
% ==========================
|
||||||
|
|
||||||
|
{mapping, "log.summarize_process_state", "rabbit.summarize_process_state_when_logged", [
|
||||||
|
{datatype, {enum, [true, false]}}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "log.error_logger_format_depth", "kernel.error_logger_format_depth", [
|
||||||
|
{datatype, [{atom, unlimited}, integer]}
|
||||||
|
]}.
|
||||||
|
{translation, "kernel.error_logger_format_depth",
|
||||||
|
fun(Conf) ->
|
||||||
|
case cuttlefish:conf_get("log.error_logger_format_depth", Conf, undefined) of
|
||||||
|
undefined -> unlimited;
|
||||||
|
unlimited -> unlimited;
|
||||||
|
Val when is_integer(Val) andalso Val > 0 -> Val;
|
||||||
|
_ -> cuttlefish:invalid("should be positive integer or 'unlimited'")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
}.
|
||||||
|
|
||||||
{mapping, "log.dir", "rabbit.log_root", [
|
{mapping, "log.dir", "rabbit.log_root", [
|
||||||
{datatype, string},
|
{datatype, string},
|
||||||
{validators, ["dir_writable"]}]}.
|
{validators, ["dir_writable"]}]}.
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
-export([start_link/2]).
|
-export([start_link/2]).
|
||||||
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
|
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
|
||||||
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
|
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
|
||||||
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
|
prioritise_cast/3, prioritise_info/3, format_state/1, format_message_queue/2]).
|
||||||
-export([format/1]).
|
-export([format/1]).
|
||||||
-export([is_policy_applicable/2]).
|
-export([is_policy_applicable/2]).
|
||||||
|
|
||||||
|
@ -1746,6 +1746,9 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
|
||||||
#q.stats_timer),
|
#q.stats_timer),
|
||||||
{hibernate, stop_rate_timer(State1)}.
|
{hibernate, stop_rate_timer(State1)}.
|
||||||
|
|
||||||
|
format_state(#q{}=S) ->
|
||||||
|
maybe_format_backing_queue_state(S).
|
||||||
|
|
||||||
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
|
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
|
||||||
|
|
||||||
%% TODO: this can be removed after 3.13
|
%% TODO: this can be removed after 3.13
|
||||||
|
@ -1787,3 +1790,13 @@ queue_created_infos(State) ->
|
||||||
%% On the events API, we use long names for queue types
|
%% On the events API, we use long names for queue types
|
||||||
Keys = ?CREATION_EVENT_KEYS -- [type],
|
Keys = ?CREATION_EVENT_KEYS -- [type],
|
||||||
infos(Keys, State) ++ [{type, rabbit_classic_queue}].
|
infos(Keys, State) ++ [{type, rabbit_classic_queue}].
|
||||||
|
|
||||||
|
maybe_format_backing_queue_state(S = #q{backing_queue = BQ,
|
||||||
|
backing_queue_state = BQS0}) ->
|
||||||
|
case erlang:function_exported(BQ, format_state, 1) of
|
||||||
|
true ->
|
||||||
|
BQS1 = BQ:format_state(BQS0),
|
||||||
|
S#q{backing_queue_state = BQS1};
|
||||||
|
_ ->
|
||||||
|
S#q{backing_queue_state = backing_queue_state_truncated}
|
||||||
|
end.
|
||||||
|
|
|
@ -60,7 +60,7 @@
|
||||||
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
|
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
|
||||||
handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
|
handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
|
||||||
prioritise_call/4, prioritise_cast/3, prioritise_info/3,
|
prioritise_call/4, prioritise_cast/3, prioritise_info/3,
|
||||||
format_message_queue/2]).
|
format_state/1, format_message_queue/2]).
|
||||||
|
|
||||||
%% Internal
|
%% Internal
|
||||||
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
|
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
|
||||||
|
@ -806,6 +806,17 @@ terminate(_Reason,
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
|
format_state(#ch{} = S) ->
|
||||||
|
format_state(application:get_env(rabbit, summarize_process_state_when_logged, false), S).
|
||||||
|
|
||||||
|
format_state(false, #ch{} = S) ->
|
||||||
|
S;
|
||||||
|
format_state(true, #ch{unacked_message_q = UAMQ} = S) ->
|
||||||
|
UAMQLen = ?QUEUE:len(UAMQ),
|
||||||
|
Msg0 = io_lib:format("unacked_message_q (~b elements) (truncated)", [UAMQLen]),
|
||||||
|
Msg1 = rabbit_data_coercion:to_utf8_binary(Msg0),
|
||||||
|
S#ch{unacked_message_q = Msg1}.
|
||||||
|
|
||||||
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
|
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
|
||||||
|
|
||||||
get_consumer_timeout() ->
|
get_consumer_timeout() ->
|
||||||
|
|
|
@ -30,6 +30,10 @@
|
||||||
%% Shared with rabbit_classic_queue_store_v2.
|
%% Shared with rabbit_classic_queue_store_v2.
|
||||||
-export([queue_dir/2]).
|
-export([queue_dir/2]).
|
||||||
|
|
||||||
|
%% Used to format the state and summarize large amounts of data in
|
||||||
|
%% the state.
|
||||||
|
-export([format_state/1]).
|
||||||
|
|
||||||
%% Internal. Used by tests.
|
%% Internal. Used by tests.
|
||||||
-export([segment_file/2]).
|
-export([segment_file/2]).
|
||||||
|
|
||||||
|
@ -1285,3 +1289,15 @@ write_file_and_ensure_dir(Name, IOData) ->
|
||||||
end;
|
end;
|
||||||
Err -> Err
|
Err -> Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
format_state(#qi{write_buffer = WriteBuffer,
|
||||||
|
cache = Cache,
|
||||||
|
confirms = Confirms} = S) ->
|
||||||
|
ConfirmsSize = sets:size(Confirms),
|
||||||
|
S#qi{write_buffer = maps:keys(WriteBuffer),
|
||||||
|
cache = maps:keys(Cache),
|
||||||
|
confirms = format_state_element(confirms, ConfirmsSize)}.
|
||||||
|
|
||||||
|
format_state_element(Element, Size) when is_atom(Element), is_integer(Size) ->
|
||||||
|
rabbit_data_coercion:to_utf8_binary(
|
||||||
|
io_lib:format("~tp (~b elements) (truncated)", [Element, Size])).
|
||||||
|
|
|
@ -46,7 +46,7 @@
|
||||||
|
|
||||||
-export([init/1, terminate/1, info/1,
|
-export([init/1, terminate/1, info/1,
|
||||||
write/4, sync/1, read/3, read_many/2, check_msg_on_disk/3,
|
write/4, sync/1, read/3, read_many/2, check_msg_on_disk/3,
|
||||||
remove/2, delete_segments/2]).
|
remove/2, delete_segments/2, format_state/1]).
|
||||||
|
|
||||||
-define(SEGMENT_EXTENSION, ".qs").
|
-define(SEGMENT_EXTENSION, ".qs").
|
||||||
|
|
||||||
|
@ -572,3 +572,8 @@ check_crc32() ->
|
||||||
segment_file(Segment, #qs{dir = Dir}) ->
|
segment_file(Segment, #qs{dir = Dir}) ->
|
||||||
N = integer_to_binary(Segment),
|
N = integer_to_binary(Segment),
|
||||||
<<Dir/binary, N/binary, ?SEGMENT_EXTENSION>>.
|
<<Dir/binary, N/binary, ?SEGMENT_EXTENSION>>.
|
||||||
|
|
||||||
|
format_state(#qs{write_buffer = WriteBuffer,
|
||||||
|
cache = Cache} = S) ->
|
||||||
|
S#qs{write_buffer = maps:keys(WriteBuffer),
|
||||||
|
cache = maps:keys(Cache)}.
|
||||||
|
|
|
@ -34,7 +34,8 @@
|
||||||
handle_pre_hibernate/1, resume/1, msg_rates/1,
|
handle_pre_hibernate/1, resume/1, msg_rates/1,
|
||||||
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
|
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
|
||||||
set_queue_version/2,
|
set_queue_version/2,
|
||||||
zip_msgs_and_acks/4]).
|
zip_msgs_and_acks/4,
|
||||||
|
format_state/1]).
|
||||||
|
|
||||||
-record(state, {bq, bqss, max_priority}).
|
-record(state, {bq, bqss, max_priority}).
|
||||||
-record(passthrough, {bq, bqs}).
|
-record(passthrough, {bq, bqs}).
|
||||||
|
@ -663,3 +664,12 @@ zip_msgs_and_acks(Pubs, AckTags) ->
|
||||||
Id = mc:get_annotation(id, Msg),
|
Id = mc:get_annotation(id, Msg),
|
||||||
{Id, AckTag}
|
{Id, AckTag}
|
||||||
end, Pubs, AckTags).
|
end, Pubs, AckTags).
|
||||||
|
|
||||||
|
format_state(S = #passthrough{bq = BQ, bqs = BQS0}) ->
|
||||||
|
case erlang:function_exported(BQ, format_state, 1) of
|
||||||
|
true ->
|
||||||
|
BQS1 = BQ:format_state(BQS0),
|
||||||
|
S#passthrough{bqs = BQS1};
|
||||||
|
_ ->
|
||||||
|
S#passthrough{bqs = passthrough_bqs_truncated}
|
||||||
|
end.
|
||||||
|
|
|
@ -16,7 +16,8 @@
|
||||||
update_rates/1, needs_timeout/1, timeout/1,
|
update_rates/1, needs_timeout/1, timeout/1,
|
||||||
handle_pre_hibernate/1, resume/1, msg_rates/1,
|
handle_pre_hibernate/1, resume/1, msg_rates/1,
|
||||||
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
|
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
|
||||||
set_queue_version/2, zip_msgs_and_acks/4]).
|
set_queue_version/2, zip_msgs_and_acks/4,
|
||||||
|
format_state/1]).
|
||||||
|
|
||||||
-export([start/2, stop/1]).
|
-export([start/2, stop/1]).
|
||||||
|
|
||||||
|
@ -2435,3 +2436,22 @@ maybe_client_terminate(MSCStateP) ->
|
||||||
_:_ ->
|
_:_ ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
format_state(#vqstate{} = S) ->
|
||||||
|
format_state(application:get_env(rabbit, summarize_process_state_when_logged, false), S).
|
||||||
|
|
||||||
|
format_state(false, #vqstate{} = S) ->
|
||||||
|
S;
|
||||||
|
format_state(true, #vqstate{q3 = Q3,
|
||||||
|
ram_pending_ack = RamPendingAck,
|
||||||
|
disk_pending_ack = DiskPendingAck,
|
||||||
|
index_state = IndexState,
|
||||||
|
store_state = StoreState} = S) ->
|
||||||
|
S#vqstate{q3 = format_q3(Q3),
|
||||||
|
ram_pending_ack = maps:keys(RamPendingAck),
|
||||||
|
disk_pending_ack = maps:keys(DiskPendingAck),
|
||||||
|
index_state = rabbit_classic_queue_index_v2:format_state(IndexState),
|
||||||
|
store_state = rabbit_classic_queue_store_v2:format_state(StoreState)}.
|
||||||
|
|
||||||
|
format_q3(Q3) ->
|
||||||
|
[SeqId || #msg_status{seq_id = SeqId} <- ?QUEUE:to_list(Q3)].
|
||||||
|
|
|
@ -1150,14 +1150,15 @@ print_event(Dev, Event, Name) ->
|
||||||
|
|
||||||
terminate(Reason, Msg, #gs2_state { name = Name,
|
terminate(Reason, Msg, #gs2_state { name = Name,
|
||||||
mod = Mod,
|
mod = Mod,
|
||||||
state = State,
|
state = ModState0,
|
||||||
debug = Debug,
|
debug = Debug,
|
||||||
stop_stats_fun = StopStatsFun
|
stop_stats_fun = StopStatsFun
|
||||||
} = GS2State) ->
|
} = GS2State) ->
|
||||||
StopStatsFun(stop_stats_timer(GS2State)),
|
StopStatsFun(stop_stats_timer(GS2State)),
|
||||||
case catch Mod:terminate(Reason, State) of
|
case catch Mod:terminate(Reason, ModState0) of
|
||||||
{'EXIT', R} ->
|
{'EXIT', R} ->
|
||||||
error_info(R, Reason, Name, Msg, State, Debug),
|
ModState1 = maybe_format_state(Mod, ModState0),
|
||||||
|
error_info(R, Reason, Name, Msg, ModState1, Debug),
|
||||||
exit(R);
|
exit(R);
|
||||||
_ ->
|
_ ->
|
||||||
case Reason of
|
case Reason of
|
||||||
|
@ -1168,17 +1169,26 @@ terminate(Reason, Msg, #gs2_state { name = Name,
|
||||||
{shutdown,_}=Shutdown ->
|
{shutdown,_}=Shutdown ->
|
||||||
exit(Shutdown);
|
exit(Shutdown);
|
||||||
_ ->
|
_ ->
|
||||||
error_info(Reason, undefined, Name, Msg, State, Debug),
|
ModState1 = maybe_format_state(Mod, ModState0),
|
||||||
|
error_info(Reason, undefined, Name, Msg, ModState1, Debug),
|
||||||
exit(Reason)
|
exit(Reason)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
maybe_format_state(M, ModState) ->
|
||||||
|
case erlang:function_exported(M, format_state, 1) of
|
||||||
|
true ->
|
||||||
|
M:format_state(ModState);
|
||||||
|
false ->
|
||||||
|
ModState
|
||||||
|
end.
|
||||||
|
|
||||||
error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
|
error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
|
||||||
%% OTP-5811 Don't send an error report if it's the system process
|
%% OTP-5811 Don't send an error report if it's the system process
|
||||||
%% application_controller which is terminating - let init take care
|
%% application_controller which is terminating - let init take care
|
||||||
%% of it instead
|
%% of it instead
|
||||||
ok;
|
ok;
|
||||||
error_info(Reason, RootCause, Name, Msg, State, Debug) ->
|
error_info(Reason, RootCause, Name, Msg, ModState, Debug) ->
|
||||||
Reason1 = error_reason(Reason),
|
Reason1 = error_reason(Reason),
|
||||||
Fmt =
|
Fmt =
|
||||||
"** Generic server ~tp terminating~n"
|
"** Generic server ~tp terminating~n"
|
||||||
|
@ -1186,10 +1196,10 @@ error_info(Reason, RootCause, Name, Msg, State, Debug) ->
|
||||||
"** When Server state == ~tp~n"
|
"** When Server state == ~tp~n"
|
||||||
"** Reason for termination == ~n** ~tp~n",
|
"** Reason for termination == ~n** ~tp~n",
|
||||||
case RootCause of
|
case RootCause of
|
||||||
undefined -> format(Fmt, [Name, Msg, State, Reason1]);
|
undefined -> format(Fmt, [Name, Msg, ModState, Reason1]);
|
||||||
_ -> format(Fmt ++ "** In 'terminate' callback "
|
_ -> format(Fmt ++ "** In 'terminate' callback "
|
||||||
"with reason ==~n** ~tp~n",
|
"with reason ==~n** ~tp~n",
|
||||||
[Name, Msg, State, Reason1,
|
[Name, Msg, ModState, Reason1,
|
||||||
error_reason(RootCause)])
|
error_reason(RootCause)])
|
||||||
end,
|
end,
|
||||||
sys:print_log(Debug),
|
sys:print_log(Debug),
|
||||||
|
|
Loading…
Reference in New Issue