diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a489ff89ad..4f1d260a62 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -564,11 +564,13 @@ handle_event(QName, {osiris_offset, _From, _Offs}, {ok, State#stream_client{readers = Readers}, Deliveries}; handle_event(_QName, {stream_leader_change, Pid}, State) -> {ok, update_leader_pid(Pid, State), []}; -handle_event(_QName, {stream_local_member_change, Pid}, #stream_client{local_pid = P} = State) +handle_event(_QName, {stream_local_member_change, Pid}, + #stream_client{local_pid = P} = State) when P == Pid -> {ok, State, []}; -handle_event(_QName, {stream_local_member_change, Pid}, State = #stream_client{name = QName, - readers = Readers0}) -> +handle_event(_QName, {stream_local_member_change, Pid}, + #stream_client{name = QName, + readers = Readers0} = State) -> rabbit_log:debug("Local member change event for ~tp", [QName]), Readers1 = maps:fold(fun(T, #stream{log = Log0, reader_options = Options} = S0, Acc) -> Offset = osiris_log:next_offset(Log0), diff --git a/deps/rabbit/test/message_containers_SUITE.erl b/deps/rabbit/test/message_containers_SUITE.erl index 1ada4e9c1e..3924986d08 100644 --- a/deps/rabbit/test/message_containers_SUITE.erl +++ b/deps/rabbit/test/message_containers_SUITE.erl @@ -131,14 +131,30 @@ end_per_testcase(Testcase, Config) -> enable_ff(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QueueType = ?config(queue_type, Config), QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, - declare(Ch, QName, [{<<"x-queue-type">>, longstr, - ?config(queue_type, Config)}])), + declare(Ch, QName, + [{<<"x-queue-type">>, longstr, QueueType}])), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), - timer:sleep(100), + case QueueType of + <<"stream">> -> + %% if it is a stream we need to wait until there is a local member + %% on the node we want to subscibe from before proceeding + rabbit_ct_helpers:await_condition( + fun() -> + rabbit_ct_broker_helpers:rpc(Config, 2, ?MODULE, + has_local_member, + [#resource{kind = queue, + virtual_host = <<"/">>, + name = QName}]) + end, 60000), + ok; + _ -> + ok + end, ConsumerTag1 = <<"ctag1">>, Ch2 = rabbit_ct_client_helpers:open_channel(Config, 2), @@ -243,3 +259,17 @@ get_global_counters(Config) -> qos(Ch, Prefetch) -> ?assertMatch(#'basic.qos_ok'{}, amqp_channel:call(Ch, #'basic.qos'{prefetch_count = Prefetch})). + +has_local_member(QName) -> + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + #{name := StreamId} = amqqueue:get_type_state(Q), + case rabbit_stream_coordinator:local_pid(StreamId) of + {ok, Pid} -> + is_process_alive(Pid); + _ -> + false + end; + _Err -> + false + end. diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 6c186d70ca..79b8642fd5 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -2145,9 +2145,12 @@ leader_locator_balanced_maintenance(Config) -> declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])), - Info = find_queue_info(Config, [leader]), - Leader = proplists:get_value(leader, Info), - ?assert(lists:member(Leader, [Server1, Server2])), + rabbit_ct_helpers:await_condition( + fun() -> + Info = find_queue_info(Config, [leader]), + Leader = proplists:get_value(leader, Info), + lists:member(Leader, [Server1, Server2]) + end, 60000), true = rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server3), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).