parent
ac9c9436c9
commit
9a71595b5f
|
|
@ -564,11 +564,13 @@ handle_event(QName, {osiris_offset, _From, _Offs},
|
|||
{ok, State#stream_client{readers = Readers}, Deliveries};
|
||||
handle_event(_QName, {stream_leader_change, Pid}, State) ->
|
||||
{ok, update_leader_pid(Pid, State), []};
|
||||
handle_event(_QName, {stream_local_member_change, Pid}, #stream_client{local_pid = P} = State)
|
||||
handle_event(_QName, {stream_local_member_change, Pid},
|
||||
#stream_client{local_pid = P} = State)
|
||||
when P == Pid ->
|
||||
{ok, State, []};
|
||||
handle_event(_QName, {stream_local_member_change, Pid}, State = #stream_client{name = QName,
|
||||
readers = Readers0}) ->
|
||||
handle_event(_QName, {stream_local_member_change, Pid},
|
||||
#stream_client{name = QName,
|
||||
readers = Readers0} = State) ->
|
||||
rabbit_log:debug("Local member change event for ~tp", [QName]),
|
||||
Readers1 = maps:fold(fun(T, #stream{log = Log0, reader_options = Options} = S0, Acc) ->
|
||||
Offset = osiris_log:next_offset(Log0),
|
||||
|
|
|
|||
|
|
@ -131,14 +131,30 @@ end_per_testcase(Testcase, Config) ->
|
|||
enable_ff(Config) ->
|
||||
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
||||
QueueType = ?config(queue_type, Config),
|
||||
QName = ?config(queue_name, Config),
|
||||
?assertEqual({'queue.declare_ok', QName, 0, 0},
|
||||
declare(Ch, QName, [{<<"x-queue-type">>, longstr,
|
||||
?config(queue_type, Config)}])),
|
||||
declare(Ch, QName,
|
||||
[{<<"x-queue-type">>, longstr, QueueType}])),
|
||||
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
|
||||
amqp_channel:register_confirm_handler(Ch, self()),
|
||||
|
||||
timer:sleep(100),
|
||||
case QueueType of
|
||||
<<"stream">> ->
|
||||
%% if it is a stream we need to wait until there is a local member
|
||||
%% on the node we want to subscibe from before proceeding
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun() ->
|
||||
rabbit_ct_broker_helpers:rpc(Config, 2, ?MODULE,
|
||||
has_local_member,
|
||||
[#resource{kind = queue,
|
||||
virtual_host = <<"/">>,
|
||||
name = QName}])
|
||||
end, 60000),
|
||||
ok;
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
|
||||
ConsumerTag1 = <<"ctag1">>,
|
||||
Ch2 = rabbit_ct_client_helpers:open_channel(Config, 2),
|
||||
|
|
@ -243,3 +259,17 @@ get_global_counters(Config) ->
|
|||
qos(Ch, Prefetch) ->
|
||||
?assertMatch(#'basic.qos_ok'{},
|
||||
amqp_channel:call(Ch, #'basic.qos'{prefetch_count = Prefetch})).
|
||||
|
||||
has_local_member(QName) ->
|
||||
case rabbit_amqqueue:lookup(QName) of
|
||||
{ok, Q} ->
|
||||
#{name := StreamId} = amqqueue:get_type_state(Q),
|
||||
case rabbit_stream_coordinator:local_pid(StreamId) of
|
||||
{ok, Pid} ->
|
||||
is_process_alive(Pid);
|
||||
_ ->
|
||||
false
|
||||
end;
|
||||
_Err ->
|
||||
false
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -2145,9 +2145,12 @@ leader_locator_balanced_maintenance(Config) ->
|
|||
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
|
||||
{<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])),
|
||||
|
||||
Info = find_queue_info(Config, [leader]),
|
||||
Leader = proplists:get_value(leader, Info),
|
||||
?assert(lists:member(Leader, [Server1, Server2])),
|
||||
rabbit_ct_helpers:await_condition(
|
||||
fun() ->
|
||||
Info = find_queue_info(Config, [leader]),
|
||||
Leader = proplists:get_value(leader, Info),
|
||||
lists:member(Leader, [Server1, Server2])
|
||||
end, 60000),
|
||||
|
||||
true = rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server3),
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
|
||||
|
|
|
|||
Loading…
Reference in New Issue