Remove set_stream_retention_policy command
It is not working as expected. Policies are the way to change data retention for stream.
This commit is contained in:
parent
fa44b764b7
commit
7ea2ff2651
|
@ -5,7 +5,7 @@
|
||||||
.\"
|
.\"
|
||||||
.\" Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
.\" Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||||
.\"
|
.\"
|
||||||
.Dd June 22, 2023
|
.Dd February 18, 2025
|
||||||
.Dt RABBITMQ-STREAMS 8
|
.Dt RABBITMQ-STREAMS 8
|
||||||
.Os "RabbitMQ Server"
|
.Os "RabbitMQ Server"
|
||||||
.Sh NAME
|
.Sh NAME
|
||||||
|
@ -129,18 +129,6 @@ Example:
|
||||||
.Dl rabbitmq-streams restart_stream --vhost Qo a-vhost Qc Qo a-stream Qc --preferred-leader-node Qo node
|
.Dl rabbitmq-streams restart_stream --vhost Qo a-vhost Qc Qo a-stream Qc --preferred-leader-node Qo node
|
||||||
.\" ------------------------------------
|
.\" ------------------------------------
|
||||||
.El
|
.El
|
||||||
.Ss Policies
|
|
||||||
.Bl -tag -width Ds
|
|
||||||
.\" ------------------------------------
|
|
||||||
.It Cm set_stream_retention_policy Ar stream Ar policy Fl -vhost Ar virtual-host
|
|
||||||
.Pp
|
|
||||||
Set the retention policy of a stream.
|
|
||||||
.Pp
|
|
||||||
Example:
|
|
||||||
.Sp
|
|
||||||
.Dl rabbitmq-streams set_stream_retention_policy --vhost Qo a-vhost Qc Qo a-stream Qc Qo a-policy Qc
|
|
||||||
.\" ------------------------------------
|
|
||||||
.El
|
|
||||||
.Ss Stream plugin
|
.Ss Stream plugin
|
||||||
.Bl -tag -width Ds
|
.Bl -tag -width Ds
|
||||||
.\" ------------------------------------------------------------------
|
.\" ------------------------------------------------------------------
|
||||||
|
|
|
@ -42,7 +42,6 @@
|
||||||
|
|
||||||
-export([list_with_minimum_quorum/0]).
|
-export([list_with_minimum_quorum/0]).
|
||||||
|
|
||||||
-export([set_retention_policy/3]).
|
|
||||||
-export([restart_stream/3,
|
-export([restart_stream/3,
|
||||||
add_replica/3,
|
add_replica/3,
|
||||||
delete_replica/3,
|
delete_replica/3,
|
||||||
|
@ -1002,24 +1001,6 @@ update_leader_pid(Pid, #stream_client{} = State) ->
|
||||||
state_info(_) ->
|
state_info(_) ->
|
||||||
#{}.
|
#{}.
|
||||||
|
|
||||||
set_retention_policy(Name, VHost, Policy) ->
|
|
||||||
case rabbit_amqqueue:check_max_age(Policy) of
|
|
||||||
{error, _} = E ->
|
|
||||||
E;
|
|
||||||
MaxAge ->
|
|
||||||
QName = queue_resource(VHost, Name),
|
|
||||||
Fun = fun(Q) ->
|
|
||||||
Conf = amqqueue:get_type_state(Q),
|
|
||||||
amqqueue:set_type_state(Q, Conf#{max_age => MaxAge})
|
|
||||||
end,
|
|
||||||
case rabbit_amqqueue:update(QName, Fun) of
|
|
||||||
not_found ->
|
|
||||||
{error, not_found};
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec restart_stream(VHost :: binary(), Queue :: binary(),
|
-spec restart_stream(VHost :: binary(), Queue :: binary(),
|
||||||
#{preferred_leader_node => node()}) ->
|
#{preferred_leader_node => node()}) ->
|
||||||
{ok, node()} |
|
{ok, node()} |
|
||||||
|
|
|
@ -1,49 +0,0 @@
|
||||||
## This Source Code Form is subject to the terms of the Mozilla Public
|
|
||||||
## License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
||||||
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
||||||
##
|
|
||||||
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
||||||
|
|
||||||
defmodule RabbitMQ.CLI.Streams.Commands.SetStreamRetentionPolicyCommand do
|
|
||||||
alias RabbitMQ.CLI.Core.DocGuide
|
|
||||||
|
|
||||||
@behaviour RabbitMQ.CLI.CommandBehaviour
|
|
||||||
|
|
||||||
def merge_defaults(args, opts), do: {args, Map.merge(%{vhost: "/"}, opts)}
|
|
||||||
|
|
||||||
use RabbitMQ.CLI.Core.AcceptsTwoPositionalArguments
|
|
||||||
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
|
|
||||||
|
|
||||||
def run([name, retention_policy], %{node: node_name, vhost: vhost}) do
|
|
||||||
:rabbit_misc.rpc_call(node_name, :rabbit_stream_queue, :set_retention_policy, [
|
|
||||||
name,
|
|
||||||
vhost,
|
|
||||||
retention_policy
|
|
||||||
])
|
|
||||||
end
|
|
||||||
|
|
||||||
use RabbitMQ.CLI.DefaultOutput
|
|
||||||
|
|
||||||
def banner([name, retention_policy], _) do
|
|
||||||
"Setting retention policy of stream queue #{name} to #{retention_policy} ..."
|
|
||||||
end
|
|
||||||
|
|
||||||
def usage, do: "set_stream_retention_policy [--vhost <vhost>] <name> <policy>"
|
|
||||||
|
|
||||||
def usage_additional() do
|
|
||||||
[
|
|
||||||
["<name>", "stream queue name"],
|
|
||||||
["<policy>", "retention policy"]
|
|
||||||
]
|
|
||||||
end
|
|
||||||
|
|
||||||
def usage_doc_guides() do
|
|
||||||
[
|
|
||||||
DocGuide.streams()
|
|
||||||
]
|
|
||||||
end
|
|
||||||
|
|
||||||
def help_section(), do: :policies
|
|
||||||
|
|
||||||
def description(), do: "Sets the retention policy of a stream queue"
|
|
||||||
end
|
|
|
@ -1,68 +0,0 @@
|
||||||
## This Source Code Form is subject to the terms of the Mozilla Public
|
|
||||||
## License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
||||||
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
||||||
##
|
|
||||||
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
|
||||||
##
|
|
||||||
|
|
||||||
defmodule RabbitMQ.CLI.Streams.Commands.SetStreamRetentionPolicyCommandTest do
|
|
||||||
use ExUnit.Case, async: false
|
|
||||||
import TestHelper
|
|
||||||
|
|
||||||
@command RabbitMQ.CLI.Streams.Commands.SetStreamRetentionPolicyCommand
|
|
||||||
|
|
||||||
setup_all do
|
|
||||||
RabbitMQ.CLI.Core.Distribution.start()
|
|
||||||
|
|
||||||
:ok
|
|
||||||
end
|
|
||||||
|
|
||||||
setup context do
|
|
||||||
{:ok,
|
|
||||||
opts: %{
|
|
||||||
node: get_rabbit_hostname(),
|
|
||||||
timeout: context[:test_timeout] || 30000
|
|
||||||
}}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "validate: when no arguments are provided, returns a failure" do
|
|
||||||
assert @command.validate([], %{}) == {:validation_failure, :not_enough_args}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "validate: when one argument is provided, returns a failure" do
|
|
||||||
assert @command.validate(["stream-queue-a"], %{}) == {:validation_failure, :not_enough_args}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "validate: when three or more arguments are provided, returns a failure" do
|
|
||||||
assert @command.validate(["stream-queue-a", "1D", "one-extra-arg"], %{}) ==
|
|
||||||
{:validation_failure, :too_many_args}
|
|
||||||
|
|
||||||
assert @command.validate(["stream-queue-a", "1D", "extra-arg", "another-extra-arg"], %{}) ==
|
|
||||||
{:validation_failure, :too_many_args}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "validate: treats two positional arguments and default switches as a success" do
|
|
||||||
assert @command.validate(["stream-queue-a", "2Y"], %{}) == :ok
|
|
||||||
end
|
|
||||||
|
|
||||||
@tag test_timeout: 3000
|
|
||||||
test "run: targeting an unreachable node throws a badrpc" do
|
|
||||||
assert match?(
|
|
||||||
{:badrpc, _},
|
|
||||||
@command.run(
|
|
||||||
["stream-queue-a", "1Y"],
|
|
||||||
%{node: :jake@thedog, vhost: "/", timeout: 200}
|
|
||||||
)
|
|
||||||
)
|
|
||||||
end
|
|
||||||
|
|
||||||
test "run: targeting an unknown queue returns an error", context do
|
|
||||||
assert match?(
|
|
||||||
{:error, _},
|
|
||||||
@command.run(
|
|
||||||
["stream-queue-a", "1Y"],
|
|
||||||
Map.merge(context[:opts], %{vhost: "/"})
|
|
||||||
)
|
|
||||||
)
|
|
||||||
end
|
|
||||||
end
|
|
Loading…
Reference in New Issue