From 1be663ade32c9214b77ec0ba351d02aa694d5097 Mon Sep 17 00:00:00 2001 From: Azhar Ahmed Date: Sun, 13 Apr 2025 20:08:49 -0700 Subject: [PATCH] KAFKA-19071: Fix doc for remote.storage.enable (#19345) As of 3.9, Kafka allows disabling remote storage on a topic after it was enabled. It allows subsequent enabling and disabling too. However the documentation says otherwise and needs to be corrected. Doc: https://kafka.apache.org/39/documentation/#topicconfigs_remote.storage.enable Reviewers: Luke Chen , PoAn Yang , Ken Huang --- .../kafka/common/config/TopicConfig.java | 5 +-- .../kafka/admin/RemoteTopicCrudTest.scala | 31 ++++++++++++++++++- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 1437577ed00..9788e1e66c6 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -80,8 +80,9 @@ public class TopicConfig { "Moreover, it triggers the rolling of new segment if the retention.ms condition is satisfied."; public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable"; - public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration as true. " + - "You can not disable this config once it is enabled. It will be provided in future versions."; + public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration to true. " + + "To disable tiered storage for a topic that has it enabled, set this configuration to false. " + + "When disabling, you must also set remote.log.delete.on.disable to true."; public static final String LOCAL_LOG_RETENTION_MS_CONFIG = "local.retention.ms"; public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of milliseconds to keep the local log segment before it gets deleted. " + diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala index 1880024ac89..173355d6b6b 100644 --- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala +++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala @@ -441,7 +441,36 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { AlterConfigOp.OpType.SET), )) assertThrowsException(classOf[InvalidConfigurationException], - () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling remote storage feature on the topic level is not supported.") + () => admin.incrementalAlterConfigs(configs).all().get(), "It is invalid to disable remote storage without deleting remote data. " + + "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " + + "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.") + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testUpdateTopicConfigWithDisablingRemoteStorageWithDeleteOnDisable(quorum: String): Unit = { + val admin = createAdminClient() + val topicConfig = new Properties + topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), + AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"), + AlterConfigOp.OpType.SET) + )) + admin.incrementalAlterConfigs(configs).all().get() + + val newProps = new Properties() + configs.get(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName)).forEach { op => + newProps.setProperty(op.configEntry().name(), op.configEntry().value()) + } + + verifyRemoteLogTopicConfigs(newProps) } @ParameterizedTest