Revert "Remove variable_queue_fold test flake"
This reverts commit 229ac6bfb7.
Fix syntax errors
This commit is contained in:
parent
cc27bb9521
commit
9236dc2365
|
|
@ -42,6 +42,7 @@
|
||||||
variable_queue_purge,
|
variable_queue_purge,
|
||||||
variable_queue_requeue,
|
variable_queue_requeue,
|
||||||
variable_queue_requeue_ram_beta,
|
variable_queue_requeue_ram_beta,
|
||||||
|
variable_queue_fold,
|
||||||
variable_queue_batch_publish,
|
variable_queue_batch_publish,
|
||||||
variable_queue_batch_publish_delivered
|
variable_queue_batch_publish_delivered
|
||||||
]).
|
]).
|
||||||
|
|
@ -175,7 +176,8 @@ orelse Group =:= backing_queue_embed_limit_1024 ->
|
||||||
end_per_group1(_, Config) ->
|
end_per_group1(_, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
init_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue ->
|
init_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue;
|
||||||
|
Testcase == variable_queue_fold ->
|
||||||
ok = rabbit_ct_broker_helpers:rpc(
|
ok = rabbit_ct_broker_helpers:rpc(
|
||||||
Config, 0, application, set_env,
|
Config, 0, application, set_env,
|
||||||
[rabbit, queue_explicit_gc_run_operation_threshold, 0]),
|
[rabbit, queue_explicit_gc_run_operation_threshold, 0]),
|
||||||
|
|
@ -183,7 +185,8 @@ init_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue ->
|
||||||
init_per_testcase(Testcase, Config) ->
|
init_per_testcase(Testcase, Config) ->
|
||||||
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
||||||
|
|
||||||
end_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue ->
|
end_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue;
|
||||||
|
Testcase == variable_queue_fold ->
|
||||||
ok = rabbit_ct_broker_helpers:rpc(
|
ok = rabbit_ct_broker_helpers:rpc(
|
||||||
Config, 0, application, set_env,
|
Config, 0, application, set_env,
|
||||||
[rabbit, queue_explicit_gc_run_operation_threshold, 1000]),
|
[rabbit, queue_explicit_gc_run_operation_threshold, 1000]),
|
||||||
|
|
@ -1147,6 +1150,39 @@ variable_queue_requeue_ram_beta2(VQ0, _Config) ->
|
||||||
{_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7),
|
{_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7),
|
||||||
VQ8.
|
VQ8.
|
||||||
|
|
||||||
|
variable_queue_fold(Config) ->
|
||||||
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||||
|
?MODULE, variable_queue_fold1, [Config]).
|
||||||
|
|
||||||
|
variable_queue_fold1(Config) ->
|
||||||
|
with_fresh_variable_queue(
|
||||||
|
fun variable_queue_fold2/2,
|
||||||
|
?config(variable_queue_type, Config)).
|
||||||
|
|
||||||
|
variable_queue_fold2(VQ0, _Config) ->
|
||||||
|
{PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
|
||||||
|
variable_queue_with_holes(VQ0),
|
||||||
|
Count = rabbit_variable_queue:depth(VQ1),
|
||||||
|
Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs),
|
||||||
|
lists:foldl(fun (Cut, VQ2) ->
|
||||||
|
test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2)
|
||||||
|
end, VQ1, [0, 1, 2, Count div 2,
|
||||||
|
Count - 1, Count, Count + 1, Count * 2]).
|
||||||
|
|
||||||
|
test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) ->
|
||||||
|
{Acc, VQ1} = rabbit_variable_queue:fold(
|
||||||
|
fun (M, _, Pending, A) ->
|
||||||
|
MInt = msg2int(M),
|
||||||
|
Pending = lists:member(MInt, PendingMsgs), %% assert
|
||||||
|
case MInt =< Cut of
|
||||||
|
true -> {cont, [MInt | A]};
|
||||||
|
false -> {stop, A}
|
||||||
|
end
|
||||||
|
end, [], VQ0),
|
||||||
|
Expected = lists:takewhile(fun (I) -> I =< Cut end, Msgs),
|
||||||
|
Expected = lists:reverse(Acc), %% assertion
|
||||||
|
VQ1.
|
||||||
|
|
||||||
variable_queue_batch_publish(Config) ->
|
variable_queue_batch_publish(Config) ->
|
||||||
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||||
?MODULE, variable_queue_batch_publish1, [Config]).
|
?MODULE, variable_queue_batch_publish1, [Config]).
|
||||||
|
|
|
||||||
|
|
@ -94,7 +94,8 @@ end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
|
||||||
end_per_testcase0(Testcase, Config);
|
end_per_testcase0(Testcase, Config);
|
||||||
end_per_testcase(dead_queue_rejects = Testcase, Config) ->
|
end_per_testcase(dead_queue_rejects = Testcase, Config) ->
|
||||||
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||||
amqp_channel:call(Ch, #'queue.delete'{queue = <<"dead_queue_rejects">>});
|
amqp_channel:call(Ch, #'queue.delete'{queue = <<"dead_queue_rejects">>}),
|
||||||
|
end_per_testcase0(Testcase, Config);
|
||||||
end_per_testcase(mixed_dead_alive_queues_reject = Testcase, Config) ->
|
end_per_testcase(mixed_dead_alive_queues_reject = Testcase, Config) ->
|
||||||
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||||
amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_dead">>}),
|
amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_dead">>}),
|
||||||
|
|
@ -144,6 +145,7 @@ dead_queue_rejects(Config) ->
|
||||||
{'basic.nack',_,_,_} -> ok
|
{'basic.nack',_,_,_} -> ok
|
||||||
after 10000 ->
|
after 10000 ->
|
||||||
error(timeout_waiting_for_nack)
|
error(timeout_waiting_for_nack)
|
||||||
|
end.
|
||||||
|
|
||||||
mixed_dead_alive_queues_reject(Config) ->
|
mixed_dead_alive_queues_reject(Config) ->
|
||||||
Conn = ?config(conn, Config),
|
Conn = ?config(conn, Config),
|
||||||
|
|
@ -184,14 +186,14 @@ mixed_dead_alive_queues_reject(Config) ->
|
||||||
|
|
||||||
kill_the_queue(QueueNameDead, Config),
|
kill_the_queue(QueueNameDead, Config),
|
||||||
|
|
||||||
amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName,
|
amqp_channel:cast(Ch, #'basic.publish'{exchange = ExchangeName,
|
||||||
routing_key = <<"route">>},
|
routing_key = <<"route">>},
|
||||||
#amqp_msg{payload = <<"HI">>}),
|
#amqp_msg{payload = <<"HI">>}),
|
||||||
|
|
||||||
receive
|
receive
|
||||||
{'basic.nack',_,_,_} -> ok;
|
{'basic.nack',_,_,_} -> ok;
|
||||||
{'basic.ack',_,_} -> error(expecting_nack_got_ack)
|
{'basic.ack',_,_} -> error(expecting_nack_got_ack)
|
||||||
after 50000 ->
|
after 10000 ->
|
||||||
error({timeout_waiting_for_nack, process_info(self(), messages)})
|
error({timeout_waiting_for_nack, process_info(self(), messages)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue