CQv2 prop: Clean restarts don't drop transient messages

This commit is contained in:
Loïc Hoguin 2022-02-04 15:22:25 +01:00
parent 9e59c6a698
commit 4a7cce831c
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
1 changed files with 3 additions and 4 deletions

View File

@ -349,10 +349,9 @@ next_state(St=#cq{q=Q0, confirmed=Confirmed, uncertain=Uncertain0, channels=Chan
%% were received.
{Uncertain, Q} = maps:fold(fun(Ch, ChQ, {UncertainAcc, QAcc}) ->
ChQL = queue:to_list(ChQ),
%% We only keep persistent messages because on restart the queue acks the transients.
%% We canceled all consumers so there is no risk of receiving a transient in transit.
%% We keep both transient and persistent messages on clean restarts.
ChQConfirmed = [Msg || Msg <- ChQL, lists:member(Msg, Confirmed)],
ChQUncertain = [Msg || Msg = #amqp_msg{props=#'P_basic'{delivery_mode=2}} <- ChQL, not lists:member(Msg, Confirmed)],
ChQUncertain = [Msg || Msg <- ChQL, not lists:member(Msg, Confirmed)],
{ChQUncertain ++ UncertainAcc, QAcc#{Ch => queue:from_list(ChQConfirmed)}}
end, {Uncertain0, #{}}, Q0),
St#cq{amq=AMQ, q=Q, restarted=true, uncertain=Uncertain, channels=Channels};
@ -362,7 +361,7 @@ next_state(St=#cq{q=Q0, confirmed=Confirmed, uncertain=Uncertain0}, AMQ, {call,
%% were received.
{Uncertain, Q} = maps:fold(fun(Ch, ChQ, {UncertainAcc, QAcc}) ->
ChQL = queue:to_list(ChQ),
%% We only keep persistent messages because on restart the queue acks the transients.
%% We only keep persistent messages because on dirty restart the queue acks the transients.
ChQConfirmed = [Msg || Msg <- ChQL, lists:member(Msg, Confirmed)],
%% We keep both persistent and transient in Uncertain because there might
%% be messages in transit that will be received by consumers.