CQ property suite: Add publisher confirms

This commit is contained in:
Loïc Hoguin 2021-12-08 13:52:53 +01:00
parent 47f1198cb1
commit 16f1843725
No known key found for this signature in database
GPG Key ID: C69E26E3A9DF618F
1 changed files with 41 additions and 19 deletions

View File

@ -174,10 +174,10 @@ command(St) ->
ChannelCmds = case has_channels(St) of
false -> [];
true -> [
{100, {call, ?MODULE, cmd_channel_confirm_mode, [channel(St)]}},
{100, {call, ?MODULE, cmd_channel_close, [channel(St)]}},
{900, {call, ?MODULE, cmd_channel_publish, [St, channel(St), integer(0, 1024*1024), boolean(), expiration()]}},
% %% channel enable confirm mode
% {300, {call, ?MODULE, cmd_channel_await_publisher_confirms, [channel(St)]}},
{300, {call, ?MODULE, cmd_channel_wait_for_confirms, [channel(St)]}},
{300, {call, ?MODULE, cmd_channel_basic_get, [St, channel(St)]}},
{300, {call, ?MODULE, cmd_channel_consume, [St, channel(St)]}},
{100, {call, ?MODULE, cmd_channel_cancel, [St, channel(St)]}},
@ -244,16 +244,21 @@ next_state(St=#cq{q=Q}, Msg, {call, _, cmd_basic_get_msg, _}) ->
next_state(St, _, {call, _, cmd_purge, _}) ->
St#cq{q=#{}};
next_state(St=#cq{channels=Channels}, Ch, {call, _, cmd_channel_open, _}) ->
St#cq{channels=Channels#{Ch => idle}}; %% @todo A record instead of 'idle' | {'consume', Tag}?
St#cq{channels=Channels#{Ch => #{consumer => none, confirms => false}}};
next_state(St=#cq{channels=Channels}, _, {call, _, cmd_channel_close, [Ch]}) ->
%% @todo What about publisher confirms?
%% @todo What about messages we are currently in the process of receiving?
St#cq{channels=maps:remove(Ch, Channels)};
next_state(St=#cq{channels=Channels}, _, {call, _, cmd_channel_confirm_mode, [Ch]}) ->
ChInfo = maps:get(Ch, Channels),
St#cq{channels=Channels#{Ch => ChInfo#{confirms => true}}};
next_state(St=#cq{q=Q}, Msg, {call, _, cmd_channel_publish, [_, Ch|_]}) ->
%% @todo If in confirms mode, we need to keep track of things.
%% Otherwise just queue the message as normal.
ChQ = maps:get(Ch, Q, queue:new()),
St#cq{q=Q#{Ch => queue:in(Msg, ChQ)}};
next_state(St, _, {call, _, cmd_channel_wait_for_confirms, _}) ->
St;
%% @todo Special case 'empty' as an optimisation?
next_state(St=#cq{q=Q}, Msg, {call, _, cmd_channel_basic_get, _}) ->
%% When there are multiple active consumers we may receive
@ -269,9 +274,11 @@ next_state(St=#cq{q=Q}, Msg, {call, _, cmd_channel_basic_get, _}) ->
%% they are in the queue.
St#cq{q=queue_delete(Q, Msg)};
next_state(St=#cq{channels=Channels}, Tag, {call, _, cmd_channel_consume, [_, Ch]}) ->
St#cq{channels=Channels#{Ch => {consume, Tag}}};
ChInfo = maps:get(Ch, Channels),
St#cq{channels=Channels#{Ch => ChInfo#{consumer => Tag}}};
next_state(St=#cq{channels=Channels}, _, {call, _, cmd_channel_cancel, [_, Ch]}) ->
St#cq{channels=Channels#{Ch => idle}};
ChInfo = maps:get(Ch, Channels),
St#cq{channels=Channels#{Ch => ChInfo#{consumer => none}}};
next_state(St, none, {call, _, cmd_channel_receive_and_ack, _}) ->
St;
next_state(St=#cq{q=Q}, Msg, {call, _, cmd_channel_receive_and_ack, _}) ->
@ -313,25 +320,29 @@ queue_delete(Qs0, Msg) ->
%% Preconditions.
precondition(St, {call, _, cmd_channel_close, _}) ->
has_channels(St);
precondition(St, {call, _, cmd_channel_publish, _}) ->
has_channels(St);
precondition(#cq{channels=Channels}, {call, _, cmd_channel_confirm_mode, [Ch]}) ->
%% Only enabled confirms if they were not already enabled.
%% Otherwise it is a no-op so not a big problem but this
%% reduces the quality of the test runs.
maps:get(confirms, maps:get(Ch, Channels)) =:= false;
precondition(#cq{channels=Channels}, {call, _, cmd_channel_wait_for_confirms, [Ch]}) ->
%% Only wait for confirms when they were enabled.
maps:get(confirms, maps:get(Ch, Channels)) =:= true;
precondition(#cq{channels=Channels}, {call, _, cmd_channel_basic_get, [_, Ch]}) ->
%% Using both consume and basic_get is non-deterministic.
maps:get(Ch, Channels) =:= idle;
maps:get(consumer, maps:get(Ch, Channels)) =:= none;
precondition(#cq{channels=Channels}, {call, _, cmd_channel_consume, [_, Ch]}) ->
%% Don't consume if we are already consuming on this channel.
maps:get(Ch, Channels) =:= idle;
maps:get(consumer, maps:get(Ch, Channels)) =:= none;
precondition(#cq{channels=Channels}, {call, _, cmd_channel_cancel, [_, Ch]}) ->
%% Only cancel the consume when we are already consuming on this channel.
maps:get(Ch, Channels) =/= idle;
maps:get(consumer, maps:get(Ch, Channels)) =/= none;
precondition(#cq{channels=Channels}, {call, _, cmd_channel_receive_and_ack, [_, Ch]}) ->
%% Only receive and ack when we are already consuming on this channel.
maps:get(Ch, Channels) =/= idle;
maps:get(consumer, maps:get(Ch, Channels)) =/= none;
precondition(#cq{channels=Channels}, {call, _, cmd_channel_receive_and_reject, [_, Ch]}) ->
%% Only receive and reject when we are already consuming on this channel.
maps:get(Ch, Channels) =/= idle;
maps:get(consumer, maps:get(Ch, Channels)) =/= none;
precondition(_, _) ->
true.
@ -379,10 +390,14 @@ postcondition(_, {call, _, cmd_purge, _}, {ok, _}) ->
true;
postcondition(_, {call, _, cmd_channel_open, _}, _) ->
true;
postcondition(_, {call, _, cmd_channel_confirm_mode, _}, _) ->
true;
postcondition(_, {call, _, cmd_channel_close, _}, Res) ->
Res =:= ok;
postcondition(_, {call, _, cmd_channel_publish, _}, Msg) ->
is_record(Msg, amqp_msg);
postcondition(_, {call, _, cmd_channel_wait_for_confirms, _}, Res) ->
Res =:= true;
postcondition(St=#cq{q=Q}, {call, _, cmd_channel_basic_get, [_, Ch]}, empty) ->
%% We may get 'empty' if there are consumers and the messages are
%% in transit.
@ -438,8 +453,8 @@ postcondition(#cq{q=Q}, {call, _, cmd_channel_receive_and_reject, _}, Msg) ->
has_consumers(#cq{channels=Channels}) ->
maps:fold(fun
(_, {consume, _}, _) -> true;
(_, _, Acc) -> Acc
(_, #{consumer := none}, Acc) -> Acc;
(_, _, _) -> true
end, false, Channels).
queue_head_has_msg(Qs, Msg) ->
@ -593,6 +608,10 @@ cmd_channel_open(#cq{config=Config}) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Ch.
cmd_channel_confirm_mode(Ch) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
ok.
cmd_channel_close(Ch) ->
%% We cannot close the channel with
%% rabbit_ct_client_helpers:close_channel(Ch)
@ -613,6 +632,9 @@ cmd_channel_publish(#cq{amq=AMQ}, Ch, PayloadSize, Mandatory, Expiration) ->
Msg),
Msg.
cmd_channel_wait_for_confirms(Ch) ->
amqp_channel:wait_for_confirms(Ch, {1, second}).
cmd_channel_basic_get(#cq{amq=AMQ}, Ch) ->
#resource{name = Name} = amqqueue:get_name(AMQ),
case amqp_channel:call(Ch, #'basic.get'{queue = Name, no_ack = true}) of
@ -632,7 +654,7 @@ cmd_channel_consume(#cq{amq=AMQ}, Ch) ->
Tag.
cmd_channel_cancel(#cq{channels=Channels}, Ch) ->
{consume, Tag} = maps:get(Ch, Channels),
#{consumer := Tag} = maps:get(Ch, Channels),
#'basic.cancel_ok'{} =
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = Tag}),
receive #'basic.cancel_ok'{consumer_tag = Tag} -> ok end,
@ -640,7 +662,7 @@ cmd_channel_cancel(#cq{channels=Channels}, Ch) ->
do_receive_reject_all(Ch, Tag).
cmd_channel_receive_and_ack(#cq{channels=Channels}, Ch) ->
{consume, Tag} = maps:get(Ch, Channels),
#{consumer := Tag} = maps:get(Ch, Channels),
receive
{#'basic.deliver'{consumer_tag = Tag,
delivery_tag = DeliveryTag}, Msg} ->
@ -651,7 +673,7 @@ cmd_channel_receive_and_ack(#cq{channels=Channels}, Ch) ->
end.
cmd_channel_receive_and_reject(#cq{channels=Channels}, Ch) ->
{consume, Tag} = maps:get(Ch, Channels),
#{consumer := Tag} = maps:get(Ch, Channels),
receive
{#'basic.deliver'{consumer_tag = Tag,
delivery_tag = DeliveryTag}, Msg} ->