Merge pull request #14170 from rabbitmq/cli-reset-offset
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 26) (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 27) (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 28) (push) Waiting to run Details
Test (make) / Test (1.18, 28, khepri) (push) Waiting to run Details
Test (make) / Test (1.18, 28, mnesia) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.18, 28, khepri) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.18, 28, mnesia) (push) Waiting to run Details
Test (make) / Type check (1.18, 28) (push) Waiting to run Details

Add rabbitmq-streams reset_offset command
This commit is contained in:
Arnaud Cogoluègnes 2025-07-01 16:19:54 +00:00 committed by GitHub
commit 3e615803be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 282 additions and 54 deletions

View File

@ -453,6 +453,14 @@ List only writer deduplication tracking information.
Example:
.Sp
.Dl rabbitmq-streams list_stream_tracking stream-1 --offset
.\" ------------------------------------------------------------------
.It Cm reset_offset Fl -stream Ar stream Fl -reference Ar reference Oo Fl -vhost Ar vhost Oc
.Pp
Reset the stored offset for a consumer name on a stream.
.Pp
Example:
.Sp
.Dl rabbitmq-streams reset_offset --stream stream --reference app-1
.El
.\" ------------------------------------------------------------------
.Sh SEE ALSO

View File

@ -0,0 +1,116 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand').
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
-include_lib("rabbitmq_stream/src/rabbit_stream_utils.hrl").
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
-export([formatter/0,
scopes/0,
switches/0,
aliases/0,
usage/0,
usage_additional/0,
usage_doc_guides/0,
banner/2,
validate/2,
merge_defaults/2,
run/2,
output/2,
description/0,
help_section/0]).
formatter() ->
'Elixir.RabbitMQ.CLI.Formatters.String'.
scopes() ->
[streams].
switches() ->
[{stream, string}, {reference, string}].
aliases() ->
[].
description() ->
<<"Reset the stored offset for a consumer name on a stream">>.
help_section() ->
{plugin, stream}.
validate([], #{stream := _, reference := R}) when ?IS_INVALID_REF(R) ->
{validation_failure, reference_too_long};
validate([], #{stream := _, reference := _}) ->
ok;
validate(Args, _) when is_list(Args) andalso length(Args) > 0 ->
{validation_failure, too_many_args};
validate(_, _) ->
{validation_failure, not_enough_args}.
merge_defaults(Args, Opts) ->
{Args, maps:merge(#{vhost => <<"/">>}, Opts)}.
usage() ->
<<"reset_offset --stream <stream> "
"--reference <reference> [--vhost <vhost>]">>.
usage_additional() ->
<<"">>.
usage_doc_guides() ->
[?STREAMS_GUIDE_URL].
run(_,
#{node := NodeName,
vhost := VHost,
stream := Stream,
reference := Reference,
timeout := Timeout}) ->
rabbit_misc:rpc_call(NodeName,
rabbit_stream_manager,
reset_offset,
[VHost, Stream, Reference],
Timeout).
banner(_, _) ->
<<"Resetting stored offset ...">>.
output(ok, Opts) ->
Silent = maps:get(quiet, Opts, maps:get(silent, Opts, false)),
case Silent of
true ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(ok);
false ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output({ok, <<"Done">>})
end;
output({validation_failure, reference_too_long}, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output({error,
<<"The reference is too long">>});
output({error, not_found}, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output({error,
<<"The stream does not exist">>});
output({error, not_available}, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output({error,
<<"The stream is not available">>});
output({error, no_reference}, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output({error,
<<"There is no stored offset "
"for this reference, no need to reset">>});
output(R, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(R).

View File

@ -21,6 +21,7 @@
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit/include/amqqueue.hrl").
-include_lib("rabbitmq_stream/src/rabbit_stream_utils.hrl").
%% API
-export([create/4,
@ -33,7 +34,8 @@
topology/2,
route/3,
partitions/2,
partition_index/3]).
partition_index/3,
reset_offset/3]).
-spec create(binary(), binary(), #{binary() => binary()}, binary()) ->
{ok, map()} |
@ -396,6 +398,27 @@ partition_index(VirtualHost, SuperStream, Stream) ->
{error, stream_not_found}
end.
-spec reset_offset(binary(), binary(), binary()) ->
ok |
{error, not_available | not_found | no_reference |
{validation_failed, term()}}.
reset_offset(_, _, Ref) when ?IS_INVALID_REF(Ref) ->
{error, {validation_failed,
rabbit_misc:format("Reference is too long to store offset: ~p",
[byte_size(Ref)])}};
reset_offset(VH, S, Ref) ->
case lookup_leader(VH, S) of
{ok, P} ->
case osiris:read_tracking(P, offset, Ref) of
undefined ->
{error, no_reference};
{offset, _} ->
osiris:write_tracking(P, Ref, {offset, 0})
end;
R ->
R
end.
stream_queue_arguments(Arguments) ->
stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}],
Arguments).

View File

@ -19,6 +19,7 @@
-behaviour(gen_statem).
-include("rabbit_stream_utils.hrl").
-include("rabbit_stream_reader.hrl").
-include("rabbit_stream_metrics.hrl").
@ -80,7 +81,6 @@
peer_cert_validity]).
-define(UNKNOWN_FIELD, unknown_field).
-define(SILENT_CLOSE_DELAY, 3_000).
-define(IS_INVALID_REF(Ref), is_binary(Ref) andalso byte_size(Ref) > 255).
-define(SAC_MOD, rabbit_stream_sac_coordinator).
-import(rabbit_stream_utils, [check_write_permitted/2,

View File

@ -0,0 +1,16 @@
%% The contents of this file are subject to the Mozilla Public License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
%% The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-define(IS_INVALID_REF(Ref), is_binary(Ref) andalso byte_size(Ref) > 255).

View File

@ -35,7 +35,8 @@
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand').
-define(COMMAND_ACTIVATE_STREAM_CONSUMER,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand').
-define(COMMAND_RESET_OFFSET,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand').
all() ->
[{group, list_connections},
@ -45,6 +46,7 @@ all() ->
{group, list_group_consumers},
{group, activate_consumer},
{group, list_stream_tracking},
{group, reset_offset},
{group, super_streams}].
groups() ->
@ -67,6 +69,9 @@ groups() ->
{list_stream_tracking, [],
[list_stream_tracking_validate, list_stream_tracking_merge_defaults,
list_stream_tracking_run]},
{reset_offset, [],
[reset_offset_validate, reset_offset_merge_defaults,
reset_offset_run]},
{super_streams, [],
[add_super_stream_merge_defaults,
add_super_stream_validate,
@ -708,6 +713,65 @@ list_stream_tracking_run(Config) ->
close(S, C),
ok.
reset_offset_validate(_) ->
Cmd = ?COMMAND_RESET_OFFSET,
ValidOpts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{})),
?assertMatch({validation_failure, not_enough_args},
Cmd:validate([], #{vhost => <<"test">>})),
?assertMatch({validation_failure, too_many_args},
Cmd:validate([<<"foo">>], ValidOpts)),
?assertMatch({validation_failure, reference_too_long},
Cmd:validate([], ValidOpts#{reference => gen_bin(256)})),
?assertMatch(ok, Cmd:validate([], ValidOpts)),
?assertMatch(ok, Cmd:validate([], ValidOpts#{reference => gen_bin(255)})).
reset_offset_merge_defaults(_Config) ->
Cmd = ?COMMAND_RESET_OFFSET,
Opts = #{vhost => <<"/">>,
stream => <<"s1">>,
reference => <<"foo">>},
?assertEqual({[], Opts},
Cmd:merge_defaults([], maps:without([vhost], Opts))),
Merged = maps:merge(Opts, #{vhost => "vhost"}),
?assertEqual({[], Merged},
Cmd:merge_defaults([], Merged)).
reset_offset_run(Config) ->
Cmd = ?COMMAND_RESET_OFFSET,
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts =#{node => Node,
timeout => 10000,
vhost => <<"/">>},
Args = [],
St = atom_to_binary(?FUNCTION_NAME, utf8),
Ref = <<"foo">>,
OptsGroup = maps:merge(#{stream => St, reference => Ref},
Opts),
%% the stream does not exist yet
?assertMatch({error, not_found},
Cmd:run(Args, OptsGroup)),
Port = rabbit_stream_SUITE:get_stream_port(Config),
{S, C} = start_stream_connection(Port),
create_stream(S, St, C),
?assertEqual({error, no_reference}, Cmd:run(Args, OptsGroup)),
store_offset(S, St, Ref, 42, C),
check_stored_offset(S, St, Ref, 42, C),
?assertMatch(ok, Cmd:run(Args, OptsGroup)),
check_stored_offset(S, St, Ref, 0, C),
delete_stream(S, St, C),
close(S, C),
ok.
add_super_stream_merge_defaults(_Config) ->
?assertMatch({[<<"super-stream">>],
#{partitions := 3, vhost := <<"/">>}},
@ -1024,6 +1088,10 @@ store_offset(S, Stream, Reference, Value, C) ->
{error, offset_not_stored}
end.
check_stored_offset(S, Stream, Reference, Expected, C) ->
check_stored_offset(S, Stream, Reference, Expected, C, 20).
check_stored_offset(_, _, _, _, _, 0) ->
error;
check_stored_offset(S, Stream, Reference, Expected, C, Attempt) ->
@ -1061,3 +1129,5 @@ check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt) ->
check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt - 1)
end.
gen_bin(L) ->
list_to_binary(lists:duplicate(L, "a")).

View File

@ -20,7 +20,8 @@ groups() ->
[manage_super_stream,
lookup_leader,
lookup_member,
partition_index]}].
partition_index,
reset_offset]}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
@ -196,73 +197,67 @@ partition_index(Config) ->
amqp_connection:close(C),
ok.
reset_offset(Config) ->
S = atom_to_binary(?FUNCTION_NAME, utf8),
Ref = <<"app">>,
?assertMatch({ok, _}, create_stream(Config, S)),
{ok, Pid} = lookup_leader(Config, S),
?assertEqual(undefined, query_offset(Config, Pid, Ref)),
?assertEqual({error, no_reference}, reset_offset(Config, S, Ref)),
ok = store_offset(Config, Pid, Ref, 42),
?assertEqual({offset, 42}, query_offset(Config, Pid, Ref)),
?assertEqual(ok, reset_offset(Config, S, Ref)),
?assertEqual({offset, 0}, query_offset(Config, Pid, Ref)),
?assertEqual({error, not_found},
reset_offset(Config, <<"does-not-exist">>, Ref)),
?assertEqual({ok, deleted}, delete_stream(Config, S)).
query_offset(Config, Pid, Ref) ->
rpc(Config, osiris, read_tracking, [Pid, Ref]).
store_offset(Config, Pid, Ref, Offset) ->
rpc(Config, osiris, write_tracking, [Pid, Ref, {offset, Offset}]).
reset_offset(Config, S, Ref) ->
rpc(Config, rabbit_stream_manager, reset_offset, [<<"/">>, S, Ref]).
create_super_stream(Config, Name, Partitions, RKs) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
create_super_stream,
[<<"/">>,
Name,
Partitions,
#{},
RKs,
<<"guest">>]).
rpc(Config, rabbit_stream_manager, create_super_stream,
[<<"/">>, Name, Partitions, #{}, RKs, <<"guest">>]).
delete_super_stream(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
delete_super_stream,
[<<"/">>, Name, <<"guest">>]).
rpc(Config, rabbit_stream_manager, delete_super_stream,
[<<"/">>, Name, <<"guest">>]).
create_stream(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
create,
[<<"/">>, Name, [], <<"guest">>]).
rpc(Config, rabbit_stream_manager, create, [<<"/">>, Name, [], <<"guest">>]).
delete_stream(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
delete,
[<<"/">>, Name, <<"guest">>]).
rpc(Config, rabbit_stream_manager, delete, [<<"/">>, Name, <<"guest">>]).
lookup_leader(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
lookup_leader,
[<<"/">>, Name]).
rpc(Config, rabbit_stream_manager, lookup_leader, [<<"/">>, Name]).
lookup_member(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
lookup_member,
[<<"/">>, Name]).
rpc(Config, rabbit_stream_manager, lookup_member, [<<"/">>, Name]).
partitions(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
partitions,
[<<"/">>, Name]).
rpc(Config, rabbit_stream_manager, partitions, [<<"/">>, Name]).
route(Config, RoutingKey, SuperStream) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
route,
[RoutingKey, <<"/">>, SuperStream]).
rpc(Config, rabbit_stream_manager, route,
[RoutingKey, <<"/">>, SuperStream]).
partition_index(Config, SuperStream, Stream) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_manager,
partition_index,
[<<"/">>, SuperStream, Stream]).
rpc(Config, rabbit_stream_manager, partition_index,
[<<"/">>, SuperStream, Stream]).
rpc(Config, M, F, A) ->
rabbit_ct_broker_helpers:rpc(Config, 0, M, F, A).
start_amqp_connection(Config) ->
Port =