Add activate_stream_consumer command

New CLI command to trigger a rebalancing in a SAC group and activate a
consumer. This is a last resort solution if all consumers in a group
accidently end up in {connected, waiting} state.

The command re-uses an existing function, which only picks the consumer
that should be active. This means it does not try to "fix" the state
(e.g. removing a disconnected consumer because its node is definitely
gone from the cluster).

Fixes #14055
This commit is contained in:
Arnaud Cogoluègnes 2025-06-10 16:51:08 +02:00
parent 58f4e83c22
commit 41acc117bd
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
4 changed files with 257 additions and 5 deletions

View File

@ -128,7 +128,7 @@ unregister_consumer(VirtualHost,
-spec activate_consumer(binary(), binary(), binary()) ->
ok | {error, sac_error() | term()}.
activate_consumer(VH, Stream, Name) ->
process_command(#command_activate_consumer{vhost =VH,
process_command(#command_activate_consumer{vhost = VH,
stream = Stream,
consumer_name= Name}).
@ -323,7 +323,13 @@ apply(#command_activate_consumer{vhost = VirtualHost,
end,
StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName,
G, StreamGroups0),
{State0#?MODULE{groups = StreamGroups1}, ok, Eff};
R = case G of
undefined ->
{error, not_found};
_ ->
ok
end,
{State0#?MODULE{groups = StreamGroups1}, R, Eff};
apply(#command_connection_reconnected{pid = Pid},
#?MODULE{groups = Groups0} = State0) ->
{State1, Eff} =
@ -1157,9 +1163,8 @@ maybe_create_group(VirtualHost,
#{{VirtualHost, Stream, ConsumerName} := _} ->
{ok, StreamGroups};
SGS ->
{ok, maps:put({VirtualHost, Stream, ConsumerName},
#group{consumers = [], partition_index = PartitionIndex},
SGS)}
{ok, SGS#{{VirtualHost, Stream, ConsumerName} =>
#group{consumers = [], partition_index = PartitionIndex}}}
end.
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->

View File

@ -949,6 +949,82 @@ active_consumer_super_stream_disconn_active_block_rebalancing_test(_) ->
assertEmpty(Eff),
ok.
activate_consumer_simple_unblock_all_waiting_test(_) ->
P = self(),
GId = group_id(),
Group = grp([csr(P, 0, {connected, waiting}),
csr(P, 1, {connected, waiting}),
csr(P, 2, {connected, waiting})]),
Groups0 = #{GId => Group},
State0 = state(Groups0),
Cmd = activate_consumer_command(stream(), name()),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, grp([csr(P, 0, {connected, active}),
csr(P, 1, {connected, waiting}),
csr(P, 2, {connected, waiting})]),
Groups1),
assertContainsActivateMessage(P, 0, Eff),
ok.
activate_consumer_simple_unblock_ignore_disconnected_test(_) ->
P = self(),
GId = group_id(),
Group = grp([csr(P, 0, {disconnected, waiting}),
csr(P, 1, {connected, waiting}),
csr(P, 2, {connected, waiting}),
csr(P, 3, {connected, waiting})]),
Groups0 = #{GId => Group},
State0 = state(Groups0),
Cmd = activate_consumer_command(stream(), name()),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, grp([csr(P, 0, {disconnected, waiting}),
csr(P, 1, {connected, active}),
csr(P, 2, {connected, waiting}),
csr(P, 3, {connected, waiting})]),
Groups1),
assertContainsActivateMessage(P, 1, Eff),
ok.
activate_consumer_super_stream_unblock_all_waiting_test(_) ->
P = self(),
GId = group_id(),
Group = grp(1, [csr(P, 0, {connected, waiting}),
csr(P, 1, {connected, waiting}),
csr(P, 2, {connected, waiting})]),
Groups0 = #{GId => Group},
State0 = state(Groups0),
Cmd = activate_consumer_command(stream(), name()),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, grp(1, [csr(P, 0, {connected, waiting}),
csr(P, 1, {connected, active}),
csr(P, 2, {connected, waiting})]),
Groups1),
assertContainsActivateMessage(P, 1, Eff),
ok.
activate_consumer_super_stream_unblock_ignore_disconnected_test(_) ->
P = self(),
GId = group_id(),
Group = grp(1, [csr(P, 0, {disconnected, waiting}),
csr(P, 1, {connected, waiting}),
csr(P, 2, {connected, waiting}),
csr(P, 3, {connected, waiting})]),
Groups0 = #{GId => Group},
State0 = state(Groups0),
Cmd = activate_consumer_command(stream(), name()),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, grp(1, [csr(P, 0, {disconnected, waiting}),
csr(P, 1, {connected, waiting}),
csr(P, 2, {connected, active}),
csr(P, 3, {connected, waiting})]),
Groups1),
assertContainsActivateMessage(P, 2, Eff),
ok.
handle_connection_down_simple_disconn_active_block_rebalancing_test(_) ->
Pid0 = new_process(),
Pid1 = new_process(),
@ -1729,6 +1805,10 @@ assertContainsCheckConnectionEffect(Pid, Effects) ->
assertContainsSendMessageEffect(Pid, Stream, Active, Effects) ->
assertContainsSendMessageEffect(Pid, 0, Stream, name(), Active, Effects).
assertContainsActivateMessage(Pid, SubId, Effects) ->
assertContainsSendMessageEffect(Pid, SubId, stream(), name(),
true, Effects).
assertContainsActivateMessage(Pid, Effects) ->
assertContainsSendMessageEffect(Pid, sub_id(), stream(), name(),
true, Effects).

View File

@ -0,0 +1,99 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand').
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
-export([formatter/0,
scopes/0,
switches/0,
aliases/0,
usage/0,
usage_additional/0,
usage_doc_guides/0,
banner/2,
validate/2,
merge_defaults/2,
run/2,
output/2,
description/0,
help_section/0]).
formatter() ->
'Elixir.RabbitMQ.CLI.Formatters.String'.
scopes() ->
[ctl, streams].
switches() ->
[{stream, string}, {reference, string}].
aliases() ->
[].
description() ->
<<"Trigger a rebalancing to activate a consumer in "
"a single active consumer group">>.
help_section() ->
{plugin, stream}.
validate([], #{stream := _, reference := _}) ->
ok;
validate(Args, _) when is_list(Args) andalso length(Args) > 0 ->
{validation_failure, too_many_args};
validate(_, _) ->
{validation_failure, not_enough_args}.
merge_defaults(_Args, Opts) ->
{[], maps:merge(#{vhost => <<"/">>}, Opts)}.
usage() ->
<<"activate_stream_consumer --stream <stream> "
"--reference <reference> [--vhost <vhost>]">>.
usage_additional() ->
<<"debugging command, use only when a group does not have "
"an active consumer">>.
usage_doc_guides() ->
[?STREAMS_GUIDE_URL].
run(_,
#{node := NodeName,
vhost := VHost,
stream := Stream,
reference := Reference,
timeout := Timeout}) ->
rabbit_misc:rpc_call(NodeName,
rabbit_stream_sac_coordinator,
activate_consumer,
[VHost, Stream, Reference],
Timeout).
banner(_, _) ->
<<"Activating a consumer in the group ...">>.
output(ok, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output({ok,
<<"OK">>});
output({error, not_found}, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output({error_string,
<<"The group does not exist">>});
output(Result, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).

View File

@ -33,6 +33,9 @@
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand').
-define(COMMAND_LIST_STREAM_TRACKING,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand').
-define(COMMAND_ACTIVATE_STREAM_CONSUMER,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand').
all() ->
[{group, list_connections},
@ -40,6 +43,7 @@ all() ->
{group, list_publishers},
{group, list_consumer_groups},
{group, list_group_consumers},
{group, activate_consumer},
{group, list_stream_tracking},
{group, super_streams}].
@ -57,6 +61,9 @@ groups() ->
{list_group_consumers, [],
[list_group_consumers_validate, list_group_consumers_merge_defaults,
list_group_consumers_run]},
{activate_consumer, [],
[activate_consumer_validate, activate_consumer_merge_defaults,
activate_consumer_run]},
{list_stream_tracking, [],
[list_stream_tracking_validate, list_stream_tracking_merge_defaults,
list_stream_tracking_run]},
@ -524,6 +531,67 @@ list_group_consumers_run(Config) ->
close(S, C),
ok.
activate_consumer_validate(_) ->
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
ValidOpts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{vhost => <<"test">>})),
?assertMatch({validation_failure, too_many_args},
Cmd:validate([<<"foo">>], ValidOpts)),
?assertMatch(ok, Cmd:validate([], ValidOpts)).
activate_consumer_merge_defaults(_Config) ->
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
Opts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertEqual({[], Opts},
Cmd:merge_defaults([], maps:without([vhost], Opts))),
Merged = maps:merge(Opts, #{vhost => "vhost"}),
?assertEqual({[], Merged},
Cmd:merge_defaults([], Merged)).
activate_consumer_run(Config) ->
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =#{node => Node,
timeout => 10000,
vhost => <<"/">>},
Args = [],
St = atom_to_binary(?FUNCTION_NAME, utf8),
ConsumerReference = <<"foo">>,
OptsGroup = maps:merge(#{stream => St, reference => ConsumerReference},
Opts),
%% the group does not exist yet
?assertEqual({error, not_found}, Cmd:run(Args, OptsGroup)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(StreamPort),
?awaitMatch(1, connection_count(Config), ?WAIT),
SubProperties =#{<<"single-active-consumer">> => <<"true">>,
<<"name">> => ConsumerReference},
create_stream(S, St, C),
subscribe(S, 0, St, SubProperties, C),
handle_consumer_update(S, C, 0),
subscribe(S, 1, St, SubProperties, C),
subscribe(S, 2, St, SubProperties, C),
?awaitMatch(3, consumer_count(Config), ?WAIT),
?assertEqual(ok, Cmd:run(Args, OptsGroup)),
delete_stream(S, St, C),
close(S, C),
ok.
handle_consumer_update(S, C0, SubId) ->
{{request, CorrId, {consumer_update, SubId, true}}, C1} =
rabbit_stream_SUITE:receive_commands(gen_tcp, S, C0),