mirror of https://github.com/apache/kafka.git
KAFKA-19071: Fix doc for remote.storage.enable (#19345)
As of 3.9, Kafka allows disabling remote storage on a topic after it was enabled. It allows subsequent enabling and disabling too. However the documentation says otherwise and needs to be corrected. Doc: https://kafka.apache.org/39/documentation/#topicconfigs_remote.storage.enable Reviewers: Luke Chen <showuon@gmail.com>, PoAn Yang <payang@apache.org>, Ken Huang <s7133700@gmail.com>
This commit is contained in:
parent
6abe1ff51a
commit
1be663ade3
|
@ -80,8 +80,9 @@ public class TopicConfig {
|
||||||
"Moreover, it triggers the rolling of new segment if the retention.ms condition is satisfied.";
|
"Moreover, it triggers the rolling of new segment if the retention.ms condition is satisfied.";
|
||||||
|
|
||||||
public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable";
|
public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable";
|
||||||
public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration as true. " +
|
public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration to true. " +
|
||||||
"You can not disable this config once it is enabled. It will be provided in future versions.";
|
"To disable tiered storage for a topic that has it enabled, set this configuration to false. " +
|
||||||
|
"When disabling, you must also set <code>remote.log.delete.on.disable</code> to true.";
|
||||||
|
|
||||||
public static final String LOCAL_LOG_RETENTION_MS_CONFIG = "local.retention.ms";
|
public static final String LOCAL_LOG_RETENTION_MS_CONFIG = "local.retention.ms";
|
||||||
public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of milliseconds to keep the local log segment before it gets deleted. " +
|
public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of milliseconds to keep the local log segment before it gets deleted. " +
|
||||||
|
|
|
@ -441,7 +441,36 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
|
||||||
AlterConfigOp.OpType.SET),
|
AlterConfigOp.OpType.SET),
|
||||||
))
|
))
|
||||||
assertThrowsException(classOf[InvalidConfigurationException],
|
assertThrowsException(classOf[InvalidConfigurationException],
|
||||||
() => admin.incrementalAlterConfigs(configs).all().get(), "Disabling remote storage feature on the topic level is not supported.")
|
() => admin.incrementalAlterConfigs(configs).all().get(), "It is invalid to disable remote storage without deleting remote data. " +
|
||||||
|
"If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " +
|
||||||
|
"If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.")
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(strings = Array("kraft"))
|
||||||
|
def testUpdateTopicConfigWithDisablingRemoteStorageWithDeleteOnDisable(quorum: String): Unit = {
|
||||||
|
val admin = createAdminClient()
|
||||||
|
val topicConfig = new Properties
|
||||||
|
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
|
||||||
|
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
|
||||||
|
topicConfig = topicConfig)
|
||||||
|
|
||||||
|
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
|
||||||
|
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
|
||||||
|
util.Arrays.asList(
|
||||||
|
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"),
|
||||||
|
AlterConfigOp.OpType.SET),
|
||||||
|
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"),
|
||||||
|
AlterConfigOp.OpType.SET)
|
||||||
|
))
|
||||||
|
admin.incrementalAlterConfigs(configs).all().get()
|
||||||
|
|
||||||
|
val newProps = new Properties()
|
||||||
|
configs.get(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName)).forEach { op =>
|
||||||
|
newProps.setProperty(op.configEntry().name(), op.configEntry().value())
|
||||||
|
}
|
||||||
|
|
||||||
|
verifyRemoteLogTopicConfigs(newProps)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
|
|
Loading…
Reference in New Issue