Fix a consistency issue in the v1 index after dirty restarts

The issue was found via classic_queue_SUITE using a currently
disabled command that kills the queue. A function has also
been added to convert a set of commands given by PropEr as
a result to Erlang code that can be put in the `do_manual`
function. Some tips have also been added.

The Erlang code that could reproduce the issue follows.
The issue never needed a loop on my machine for what it's
worth, but it might on other machines. Commands that were
not necessary were commented out. The timer:sleep(1) calls
were added as the issue did not seem to trigger without them.

do_manual(Config) ->
    St0 = #cq{name=prop_classic_queue_v1, mode=lazy, version=1,
              config=minimal_config(Config)},

    Res1 = cmd_setup_queue(St0),
    St1 = St0#cq{amq=Res1},

    do_manual_loop(St1).

do_manual_loop(St1) ->

%    Res2 = cmd_set_mode(St1, lazy),
%    true = postcondition(St1, {call, undefined, cmd_set_mode, [St1, lazy]}, Res2),
%    St2 = next_state(St1, Res2, {call, undefined, cmd_set_mode, [St1, lazy]}),
    St2 = St1,

timer:sleep(1),

%    Res3 = cmd_basic_get_msg(St2),
%    true = postcondition(St2, {call, undefined, cmd_basic_get_msg, [St2]}, Res3),
%    St3 = next_state(St2, Res3, {call, undefined, cmd_basic_get_msg, [St2]}),
    St3 = St2,

timer:sleep(1),

    Res4 = cmd_channel_open(St3),
    true = postcondition(St3, {call, undefined, cmd_channel_open, [St3]}, Res4),
    St4 = next_state(St3, Res4, {call, undefined, cmd_channel_open, [St3]}),

timer:sleep(1),

    Res5 = cmd_channel_publish(St4, Res4, 22, false, undefined),
    true = postcondition(St4, {call, undefined, cmd_channel_publish, [St4, Res4, 22, false, undefined]}, Res5),
    St5 = next_state(St4, Res5, {call, undefined, cmd_channel_publish, [St4, Res4, 22, false, undefined]}),

timer:sleep(1),

    Res6 = cmd_restart_vhost_clean(St5),
    true = postcondition(St5, {call, undefined, cmd_restart_vhost_clean, [St5]}, Res6),
    St6 = next_state(St5, Res6, {call, undefined, cmd_restart_vhost_clean, [St5]}),

timer:sleep(1),

%    Res7 = cmd_channel_publish(St6, Res4, 13, false, 71),
%    true = postcondition(St6, {call, undefined, cmd_channel_publish, [St6, Res4, 13, false, 71]}, Res7),
%    St7 = next_state(St6, Res7, {call, undefined, cmd_channel_publish, [St6, Res4, 13, false, 71]}),
    St7 = St6,

timer:sleep(1),

%    Res8 = cmd_channel_open(St7),
%    true = postcondition(St7, {call, undefined, cmd_channel_open, [St7]}, Res8),
%    St8 = next_state(St7, Res8, {call, undefined, cmd_channel_open, [St7]}),
    St8 = St7,

timer:sleep(1),

%    Res9 = cmd_channel_close(Res8),
%    true = postcondition(St8, {call, undefined, cmd_channel_close, [Res8]}, Res9),
%    St9 = next_state(St8, Res9, {call, undefined, cmd_channel_close, [Res8]}),
    St9 = St8,

timer:sleep(1),

%    Res10 = cmd_channel_close(Res4),
%    true = postcondition(St9, {call, undefined, cmd_channel_close, [Res4]}, Res10),
%    St10 = next_state(St9, Res10, {call, undefined, cmd_channel_close, [Res4]}),
    St10 = St9,

timer:sleep(1),

    Res11 = cmd_restart_queue_dirty(St10),
    true = postcondition(St10, {call, undefined, cmd_restart_queue_dirty, [St10]}, Res11),
    St11 = next_state(St10, Res11, {call, undefined, cmd_restart_queue_dirty, [St10]}),

timer:sleep(1),

    Res12 = cmd_restart_vhost_clean(St11),
    true = postcondition(St11, {call, undefined, cmd_restart_vhost_clean, [St11]}, Res12),
    St12 = next_state(St11, Res12, {call, undefined, cmd_restart_vhost_clean, [St11]}),

timer:sleep(1),

%    Res13 = cmd_set_version(St12, 1),
%    true = postcondition(St12, {call, undefined, cmd_set_version, [St12, 1]}, Res13),
%    St13 = next_state(St12, Res13, {call, undefined, cmd_set_version, [St12, 1]}),
    St13 = St12,

timer:sleep(1),

    Res14 = cmd_restart_vhost_clean(St13),
    true = postcondition(St13, {call, undefined, cmd_restart_vhost_clean, [St13]}, Res14),
    St14 = next_state(St13, Res14, {call, undefined, cmd_restart_vhost_clean, [St13]}),

timer:sleep(1),
logger:error("loop~n"),

    do_manual_loop(St14).
This commit is contained in:
Loïc Hoguin 2022-01-25 11:10:24 +01:00
parent c352525e0c
commit 777e0fd6ea
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
2 changed files with 72 additions and 12 deletions

View File

@ -683,8 +683,15 @@ init_dirty(CleanShutdown, ContainsCheckFun, State, Context) ->
CountAcc + UnackedCount,
BytesAcc + UnackedBytes, DirtyCount + Dirty}
end, {Segments, 0, 0, 0}, all_segment_nums(State1)),
State2 = maybe_flush_journal(State1 #qistate { segments = Segments1,
dirty_count = DirtyCount }),
%% We force flush the journal to avoid getting into a bad state
%% when the node gets shut down immediately after init. It takes
%% a few restarts for the problem to materialize itself, with
%% at least one message published, followed by the process crashing,
%% followed by a recovery that is dirty due to term mismatch in the
%% message store, followed by two clean recoveries. This last
%% recovery fails with a crash.
State2 = flush_journal(State1 #qistate { segments = Segments1,
dirty_count = DirtyCount }),
case Context of
convert ->
{Count, Bytes, State2};
@ -789,6 +796,7 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
{recover_message(ContainsCheckFun(MsgOrId), CleanShutdown,
Del, RelSeq, SegmentAndDirtyCount, MaxJournal),
%% @todo If the message is dropped we shouldn't add the size?
Bytes + case IsPersistent of
true -> MsgProps#message_properties.size;
false -> 0

View File

@ -107,6 +107,61 @@ end_per_group(classic_queue_tests, Config) ->
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
%% $ make -C deps/rabbit test-build
%% $ erl -pa deps/rabbit/test
%% > classic_queue_SUITE:instrs_to_manual([[{init,...},...]]).
%% Paste into do_manual/1.
%% Enable manual as the only test in groups/0.
%% $ make -C deps/rabbit ct-classic_queue
instrs_to_manual([Instrs]) ->
io:format("~ndo_manual(Config) ->~n~n"),
lists:foreach(fun
({init, CQ}) ->
#cq{name=Name, mode=Mode, version=Version} = CQ,
io:format(" St0 = #cq{name=~0p, mode=~0p, version=~0p,~n"
" config=minimal_config(Config)},~n~n",
[Name, Mode, Version]);
({set, {var,Var}, {call, ?MODULE, cmd_setup_queue, _}}) ->
Res = "Res" ++ integer_to_list(Var),
PrevSt = "St" ++ integer_to_list(Var - 1),
St = "St" ++ integer_to_list(Var),
io:format(" ~s = cmd_setup_queue(~s),~n"
" ~s = ~s#cq{amq=~s},~n~n",
[Res, PrevSt, St, PrevSt, Res]);
({set, {var,Var}, {call, ?MODULE, Cmd, [#cq{}|Args]}}) ->
Res = "Res" ++ integer_to_list(Var),
PrevSt = "St" ++ integer_to_list(Var - 1),
St = "St" ++ integer_to_list(Var),
ExtraArgs = [[", ", case A of
{var,V} -> "Res" ++ integer_to_list(V);
_ -> io_lib:format("~0p", [A])
end] || A <- Args],
io:format(" ~s = ~s(~s~s),~n"
" true = postcondition(~s, {call, undefined, ~s, [~s~s]}, ~s),~n"
" ~s = next_state(~s, ~s, {call, undefined, ~s, [~s~s]}),~n~n",
[Res, Cmd, PrevSt, ExtraArgs,
PrevSt, Cmd, PrevSt, ExtraArgs, Res,
St, PrevSt, Res, Cmd, PrevSt, ExtraArgs]);
({set, {var,Var}, {call, ?MODULE, Cmd, Args}}) ->
Res = "Res" ++ integer_to_list(Var),
PrevSt = "St" ++ integer_to_list(Var - 1),
St = "St" ++ integer_to_list(Var),
ExtraArgs = case lists:flatten([[", ", case A of
{var,V} -> "Res" ++ integer_to_list(V);
_ -> io_lib:format("~0p", [A])
end] || A <- Args]) of
"" -> "";
", " ++ ExtraArgs0 -> ExtraArgs0
end,
io:format(" ~s = ~s(~s),~n"
" true = postcondition(~s, {call, undefined, ~s, [~s]}, ~s),~n"
" ~s = next_state(~s, ~s, {call, undefined, ~s, [~s]}),~n~n",
[Res, Cmd, ExtraArgs,
PrevSt, Cmd, ExtraArgs, Res,
St, PrevSt, Res, Cmd, ExtraArgs])
end, Instrs),
io:format(" true.~n").
manual(Config) ->
%% This is where tracing of client processes
%% (amqp_channel, amqp_selective_consumer)
@ -114,17 +169,14 @@ manual(Config) ->
true = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, do_manual, [Config]).
%% Tips to help reproduce an issue:
%% - See instrs_to_manual/1 to automatically obtain code to put in this function.
%% - Do the commands after cmd_setup_queue in a loop.
%% - Add some timer:sleep(1) or longer between commands if delays are necessary.
%% - If a shrunk set of commands isn't good enough, the original might.
%% - Removing some steps can help understand the sequence of events leading to the problem.
do_manual(Config) ->
InitialState = #cq{name=?FUNCTION_NAME, mode=default, version=1,
config=minimal_config(Config)},
AMQ0 = cmd_setup_queue(InitialState),
State = InitialState#cq{amq=AMQ0},
true = is_record(State, cq),
%% This is where tracing of server processes
%% should be added if necessary.
%%
%% Insert commands here to reproduce the issue.
true.
Config =:= Config.
classic_queue_v1(Config) ->
true = rabbit_ct_broker_helpers:rpc(Config, 0,