Fix flake in test consume_from_replica

```
make -C deps/rabbit ct-rabbit_stream_queue t=cluster_size_3_parallel_1 RABBITMQ_METADATA_STORE=mnesia
```

flaked prior to this commit locally on Ubuntu with the following error after 11 runs:
```
rabbit_stream_queue_SUITE > cluster_size_3_parallel_1 > consume_from_replica
{error,
    {{shutdown,
         {server_initiated_close,406,
             <<"PRECONDITION_FAILED - stream queue 'consume_from_replica' in vhost '/' does not have a running replica on the local node">>}},
     {gen_server,call,
         [<0.8365.0>,
          {subscribe,
              {'basic.consume',0,<<"consume_from_replica">>,
                  <<"ctag">>,false,false,false,false,
                  [{<<"x-stream-offset">>,long,0}]},
              <0.8151.0>},
          infinity]}}}
```
This commit is contained in:
David Ansari 2025-02-14 10:11:33 +00:00
parent 13e24d3172
commit 0ee5e74a73
4 changed files with 13 additions and 31 deletions

View File

@ -6669,27 +6669,10 @@ ra_name(Q) ->
wait_for_local_member(<<"stream">>, QName, Config) ->
%% If it is a stream we need to wait until there is a local member
%% on the node we want to subscribe from before proceeding.
rabbit_ct_helpers:await_condition(
fun() -> rpc(Config, 0, ?MODULE, has_local_member,
[rabbit_misc:r(<<"/">>, queue, QName)])
end, 30_000);
ok = queue_utils:wait_for_local_stream_member(0, <<"/">>, QName, Config);
wait_for_local_member(_, _, _) ->
ok.
has_local_member(QName) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
#{name := StreamId} = amqqueue:get_type_state(Q),
case rabbit_stream_coordinator:local_pid(StreamId) of
{ok, Pid} ->
is_process_alive(Pid);
{error, _} ->
false
end;
{error, _} ->
false
end.
-spec find_event(Type, Props, Events) -> Ret when
Type :: atom(),
Props :: proplists:proplist(),

View File

@ -240,11 +240,7 @@ stream(Config) ->
SubCh = rabbit_ct_client_helpers:open_channel(Config, 2),
qos(SubCh, 10, false),
%% wait for local replica
rabbit_ct_helpers:await_condition(
fun() ->
queue_utils:has_local_stream_member(Config, 2, QName, <<"/">>)
end, 60000),
ok = queue_utils:wait_for_local_stream_member(2, <<"/">>, QName, Config),
try
amqp_channel:subscribe(

View File

@ -14,7 +14,7 @@
ra_name/1,
fifo_machines_use_same_version/1,
fifo_machines_use_same_version/2,
has_local_stream_member/4,
wait_for_local_stream_member/4,
has_local_stream_member_rpc/1
]).
@ -170,11 +170,13 @@ fifo_machines_use_same_version(Config, Nodenames)
|| Nodename <- Nodenames],
lists:all(fun(V) -> V =:= MachineAVersion end, OtherMachinesVersions).
has_local_stream_member(Config, Node, QName, VHost) ->
QRes = rabbit_misc:r(VHost, queue, QName),
rabbit_ct_broker_helpers:rpc(Config, Node, ?MODULE,
has_local_stream_member_rpc,
[QRes]).
wait_for_local_stream_member(Node, Vhost, QNameBin, Config) ->
QName = rabbit_misc:queue_resource(Vhost, QNameBin),
rabbit_ct_helpers:await_condition(
fun() ->
rabbit_ct_broker_helpers:rpc(
Config, Node, ?MODULE, has_local_stream_member_rpc, [QName])
end, 60_000).
has_local_stream_member_rpc(QName) ->
case rabbit_amqqueue:lookup(QName) of
@ -183,9 +185,9 @@ has_local_stream_member_rpc(QName) ->
case rabbit_stream_coordinator:local_pid(StreamId) of
{ok, Pid} ->
is_process_alive(Pid);
_ ->
{error, _} ->
false
end;
_Err ->
{error, _} ->
false
end.

View File

@ -1734,6 +1734,7 @@ consume_from_replica(Config) ->
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server3),
qos(Ch2, 10, false),
ok = queue_utils:wait_for_local_stream_member(Server3, <<"/">>, Q, Config),
subscribe(Ch2, Q, false, 0),
receive_batch(Ch2, 0, 99),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).