KAFKA-16855: remote log disable policy in KRaft (#16653)

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
Luke Chen 2024-08-03 17:38:41 +09:00
parent 38db4c46ff
commit b622121c0a
15 changed files with 409 additions and 112 deletions

View File

@ -93,17 +93,14 @@ public class TopicConfig {
"deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";
public static final String REMOTE_LOG_DISABLE_POLICY_RETAIN = "retain";
public static final String REMOTE_LOG_DISABLE_POLICY_DELETE = "delete";
public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = "remote.log.copy.disable";
public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines whether tiered data for a topic should become read only," +
" and no more data uploading on a topic.";
public static final String REMOTE_LOG_DISABLE_POLICY_CONFIG = "remote.log.disable.policy";
public static final String REMOTE_LOG_DISABLE_POLICY_DOC = String.format("Determines whether tiered data for a topic should be retained or " +
"deleted after tiered storage disablement on a topic. The two valid options are \"%s\" and \"%s\". If %s is " +
"selected then all data in remote will be kept post-disablement and will only be deleted when it breaches expiration " +
"thresholds. If %s is selected then the data will be made inaccessible immediately by advancing the log start offset and will be " +
"deleted asynchronously.", REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE,
REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE);
public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +
"deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " +
"set `remote.storage.enable` from true to false";
public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
public static final String MAX_MESSAGE_BYTES_DOC =

View File

@ -437,30 +437,48 @@ public class RemoteLogManager implements Closeable {
throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled");
}
Set<TopicIdPartition> leaderPartitions = filterPartitions(partitionsBecomeLeader)
.map(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition())).collect(Collectors.toSet());
Map<TopicIdPartition, Boolean> leaderPartitions = filterPartitions(partitionsBecomeLeader)
.collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
p -> p.log().exists(log -> log.config().remoteLogCopyDisable())));
Set<TopicIdPartition> followerPartitions = filterPartitions(partitionsBecomeFollower)
.map(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition())).collect(Collectors.toSet());
Map<TopicIdPartition, Boolean> followerPartitions = filterPartitions(partitionsBecomeFollower)
.collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
p -> p.log().exists(log -> log.config().remoteLogCopyDisable())));
if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) {
LOGGER.debug("Effective topic partitions after filtering compact and internal topics, leaders: {} and followers: {}",
leaderPartitions, followerPartitions);
leaderPartitions.forEach(this::cacheTopicPartitionIds);
followerPartitions.forEach(this::cacheTopicPartitionIds);
leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, followerPartitions);
followerPartitions.forEach(this::doHandleFollowerPartition);
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(), followerPartitions.keySet());
followerPartitions.forEach((tp, __) -> doHandleFollowerPartition(tp));
// If this node was the previous leader for the partition, then the RLMTask might be running in the
// background thread and might emit metrics. So, removing the metrics after marking this node as follower.
followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics);
followerPartitions.forEach((tp, __) -> removeRemoteTopicPartitionMetrics(tp));
leaderPartitions.forEach(this::doHandleLeaderPartition);
}
}
public void stopLeaderCopyRLMTasks(Set<Partition> partitions) {
for (Partition partition : partitions) {
TopicPartition tp = partition.topicPartition();
if (topicIdByPartitionMap.containsKey(tp)) {
TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> {
LOGGER.info("Cancelling the copy RLM task for tpId: {}", tpId);
task.cancel();
LOGGER.info("Resetting remote copy lag metrics for tpId: {}", tpId);
((RLMCopyTask) task.rlmTask).resetLagStats();
return null;
});
}
}
}
/**
* Stop the remote-log-manager task for the given partitions. And, calls the
* {@link RemoteLogMetadataManager#onStopPartitions(Set)} when {@link StopPartition#deleteLocalLog()} is true.
@ -507,16 +525,18 @@ public class RemoteLogManager implements Closeable {
LOGGER.error("Error while stopping the partition: {}", stopPartition, ex);
}
}
// Note `deleteLocalLog` will always be true when `deleteRemoteLog` is true but not the other way around.
Set<TopicIdPartition> deleteLocalPartitions = stopPartitions.stream()
.filter(sp -> sp.deleteLocalLog() && topicIdByPartitionMap.containsKey(sp.topicPartition()))
// We want to remote topicId map and stopPartition on RLMM for deleteLocalLog or stopRLMM partitions because
// in both case, they all mean the topic will not be held in this broker anymore.
// NOTE: In ZK mode, this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted
Set<TopicIdPartition> pendingActionsPartitions = stopPartitions.stream()
.filter(sp -> (sp.stopRemoteLogMetadataManager() || sp.deleteLocalLog()) && topicIdByPartitionMap.containsKey(sp.topicPartition()))
.map(sp -> new TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), sp.topicPartition()))
.collect(Collectors.toSet());
if (!deleteLocalPartitions.isEmpty()) {
// NOTE: In ZK mode, this#stopPartitions method is called when Replica state changes to Offline and
// ReplicaDeletionStarted
remoteLogMetadataManager.onStopPartitions(deleteLocalPartitions);
deleteLocalPartitions.forEach(tpId -> topicIdByPartitionMap.remove(tpId.topicPartition()));
if (!pendingActionsPartitions.isEmpty()) {
pendingActionsPartitions.forEach(tpId -> topicIdByPartitionMap.remove(tpId.topicPartition()));
remoteLogMetadataManager.onStopPartitions(pendingActionsPartitions);
}
}
@ -985,6 +1005,13 @@ public class RemoteLogManager implements Closeable {
}
}
void resetLagStats() {
String topic = topicIdPartition.topic();
int partition = topicIdPartition.partition();
brokerTopicStats.recordRemoteCopyLagBytes(topic, partition, 0);
brokerTopicStats.recordRemoteCopyLagSegments(topic, partition, 0);
}
private Path toPathIfExists(File file) {
return file.exists() ? file.toPath() : null;
}
@ -1793,20 +1820,23 @@ public class RemoteLogManager implements Closeable {
new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer));
}
void doHandleLeaderPartition(TopicIdPartition topicPartition) {
void doHandleLeaderPartition(TopicIdPartition topicPartition, Boolean remoteLogCopyDisable) {
RLMTaskWithFuture followerRLMTaskWithFuture = followerRLMTasks.remove(topicPartition);
if (followerRLMTaskWithFuture != null) {
LOGGER.info("Cancelling the follower task: {}", followerRLMTaskWithFuture.rlmTask);
followerRLMTaskWithFuture.cancel();
}
leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> {
RLMCopyTask task = new RLMCopyTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
// set this upfront when it is getting initialized instead of doing it after scheduling.
LOGGER.info("Created a new copy task: {} and getting scheduled", task);
ScheduledFuture<?> future = rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, TimeUnit.MILLISECONDS);
return new RLMTaskWithFuture(task, future);
});
// Only create copy task when remoteLogCopyDisable is disabled
if (!remoteLogCopyDisable) {
leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> {
RLMCopyTask task = new RLMCopyTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
// set this upfront when it is getting initialized instead of doing it after scheduling.
LOGGER.info("Created a new copy task: {} and getting scheduled", task);
ScheduledFuture<?> future = rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, TimeUnit.MILLISECONDS);
return new RLMTaskWithFuture(task, future);
});
}
leaderExpirationRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> {
RLMExpirationTask task = new RLMExpirationTask(topicIdPartition);

View File

@ -964,15 +964,23 @@ class LogManager(logDirs: Seq[File],
*/
def updateTopicConfig(topic: String,
newTopicConfig: Properties,
isRemoteLogStorageSystemEnabled: Boolean): Unit = {
isRemoteLogStorageSystemEnabled: Boolean,
wasRemoteLogEnabled: Boolean,
fromZK: Boolean): Unit = {
topicConfigUpdated(topic)
val logs = logsByTopic(topic)
// Combine the default properties with the overrides in zk to create the new LogConfig
val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig)
val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable()
// We would like to validate the configuration no matter whether the logs have materialised on disk or not.
// Otherwise we risk someone creating a tiered-topic, disabling Tiered Storage cluster-wide and the check
// failing since the logs for the topic are non-existent.
LogConfig.validateRemoteStorageOnlyIfSystemEnabled(newLogConfig.values(), isRemoteLogStorageSystemEnabled, true)
// `remote.log.delete.on.disable` and `remote.log.copy.disable` are unsupported in ZK mode
if (fromZK) {
LogConfig.validateNoInvalidRemoteStorageConfigsInZK(newLogConfig.values())
}
LogConfig.validateTurningOffRemoteStorageWithDelete(newLogConfig.values(), wasRemoteLogEnabled, isRemoteLogStorageEnabled)
if (logs.nonEmpty) {
logs.foreach { log =>
val oldLogConfig = log.updateConfig(newLogConfig)

View File

@ -26,14 +26,14 @@ import kafka.server.Constants._
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.server.config.{ReplicationConfigs, QuotaConfigs, ZooKeeperInternals}
import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs, ZooKeeperInternals}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.storage.internals.log.ThrottledReplicaListValidator
import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason, ThrottledReplicaListValidator}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import scala.annotation.nowarn
@ -68,25 +68,61 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
}
val logs = logManager.logsByTopic(topic)
val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
val wasCopyDisabled = logs.exists(_.config.remoteLogCopyDisable())
logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate)
// kafkaController is only defined in Zookeeper's mode
logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
wasRemoteLogEnabled, kafkaController.isDefined)
maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, wasCopyDisabled)
}
private[server] def maybeBootstrapRemoteLogComponents(topic: String,
logs: Seq[UnifiedLog],
wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = {
private[server] def maybeUpdateRemoteLogComponents(topic: String,
logs: Seq[UnifiedLog],
wasRemoteLogEnabled: Boolean,
wasCopyDisabled: Boolean): Unit = {
val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
val isCopyDisabled = logs.exists(_.config.remoteLogCopyDisable())
val isDeleteOnDisable = logs.exists(_.config.remoteLogDeleteOnDisable())
val (leaderPartitions, followerPartitions) =
logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
// Topic configs gets updated incrementally. This check is added to prevent redundant updates.
if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) {
val (leaderPartitions, followerPartitions) =
logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
// When remote log is enabled, or remote copy is enabled, we should create RLM tasks accordingly via `onLeadershipChange`.
if (isRemoteLogEnabled && (!wasRemoteLogEnabled || (wasCopyDisabled && !isCopyDisabled))) {
val topicIds = Collections.singletonMap(topic, replicaManager.metadataCache.getTopicId(topic))
replicaManager.remoteLogManager.foreach(rlm =>
rlm.onLeadershipChange(leaderPartitions.toSet.asJava, followerPartitions.toSet.asJava, topicIds))
} else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) {
warn(s"Disabling remote log on the topic: $topic is not supported.")
}
// When copy disabled, we should stop leaderCopyRLMTask, but keep expirationTask
if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) {
replicaManager.remoteLogManager.foreach(rlm => {
rlm.stopLeaderCopyRLMTasks(leaderPartitions.toSet.asJava);
})
}
// Disabling remote log storage on this topic
if (wasRemoteLogEnabled && !isRemoteLogEnabled && isDeleteOnDisable) {
val stopPartitions: java.util.HashSet[StopPartition] = new java.util.HashSet[StopPartition]()
leaderPartitions.foreach(partition => {
// delete remote logs and stop RemoteLogMetadataManager
stopPartitions.add(StopPartition(partition.topicPartition, deleteLocalLog = false,
deleteRemoteLog = true, stopRemoteLogMetadataManager = true))
})
followerPartitions.foreach(partition => {
// we need to cancel follower tasks and stop RemoteLogMetadataManager
stopPartitions.add(StopPartition(partition.topicPartition, deleteLocalLog = false,
deleteRemoteLog = false, stopRemoteLogMetadataManager = true))
})
// update the log start offset to local log start offset for the leader replicas
logs.filter(log => leaderPartitions.exists(p => p.topicPartition.equals(log.topicPartition)))
.foreach(log => log.maybeIncrementLogStartOffset(log.localLogStartOffset(), LogStartOffsetIncrementReason.SegmentDeletion))
replicaManager.remoteLogManager.foreach(rlm => rlm.stopPartitions(stopPartitions, (_, _) => {}))
}
}

View File

@ -109,7 +109,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
nullTopicConfigs.mkString(","))
}
LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()

View File

@ -99,7 +99,10 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
}
}
case class StopPartition(topicPartition: TopicPartition, deleteLocalLog: Boolean, deleteRemoteLog: Boolean = false)
case class StopPartition(topicPartition: TopicPartition,
deleteLocalLog: Boolean,
deleteRemoteLog: Boolean = false,
stopRemoteLogMetadataManager: Boolean = false)
/**
* Result metadata of a log read operation on the log

View File

@ -163,7 +163,8 @@ class AdminZkClient(zkClient: KafkaZkClient,
LogConfig.validate(Collections.emptyMap(), config,
kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()),
true)
}
private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, ReplicaAssignment],
@ -481,7 +482,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
// remove the topic overrides
LogConfig.validate(Collections.emptyMap(), configs,
kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()), true)
}
/**

View File

@ -1309,12 +1309,12 @@ public class RemoteLogManagerTest {
verifyInCache(followerTopicIdPartition, leaderTopicIdPartition);
// Evicts from topicId cache
remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> { });
remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true)), (tp, ex) -> { });
verifyNotInCache(leaderTopicIdPartition);
verifyInCache(followerTopicIdPartition);
// Evicts from topicId cache
remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(followerTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> { });
remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(followerTopicIdPartition.topicPartition(), true, true, true)), (tp, ex) -> { });
verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition);
}
@ -1344,7 +1344,7 @@ public class RemoteLogManagerTest {
spyRemoteLogManager.onLeadershipChange(
Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds);
verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition));
verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition), eq(false));
}
private MemoryRecords records(long timestamp,
@ -1837,8 +1837,8 @@ public class RemoteLogManagerTest {
remoteLogManager.startup();
BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition, throwable) -> fail("shouldn't be called");
Set<StopPartition> partitions = new HashSet<>();
partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, false));
partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, false));
partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, false, false));
partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, false, false));
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
@ -1860,8 +1860,8 @@ public class RemoteLogManagerTest {
BiConsumer<TopicPartition, Throwable> errorHandler =
(topicPartition, ex) -> fail("shouldn't be called: " + ex);
Set<StopPartition> partitions = new HashSet<>();
partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true));
partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true));
partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true));
partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true, true));
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
@ -3208,6 +3208,7 @@ public class RemoteLogManagerTest {
when(partition.topic()).thenReturn(tp.topic());
when(log.remoteLogEnabled()).thenReturn(true);
when(partition.log()).thenReturn(Option.apply(log));
when(log.config()).thenReturn(new LogConfig(new Properties()));
return partition;
}

View File

@ -156,12 +156,13 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig = topicConfig))
}
// `remote.log.delete.on.disable` and `remote.log.copy.disable` only works in KRaft mode.
@ParameterizedTest
@CsvSource(Array("zk,retain", "zk,delete", "kraft,retain", "kraft,delete"))
def testCreateRemoteTopicWithDisablePolicyRetain(quorum: String, policy: String): Unit = {
@CsvSource(Array("kraft,true,true", "kraft,true,false", "kraft,false,true", "kraft,false,false"))
def testCreateRemoteTopicWithCopyDisabledAndDeleteOnDisable(quorum: String, copyDisabled: Boolean, deleteOnDisable: Boolean): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy)
topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, copyDisabled.toString)
topicConfig.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString)
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
@ -311,6 +312,35 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
() => admin.incrementalAlterConfigs(configs).all().get(), "Disabling remote storage feature on the topic level is not supported.")
}
@ParameterizedTest
@ValueSource(strings = Array("zk"))
def testUpdateInvalidRemoteStorageConfigUnderZK(quorum: String): Unit = {
val admin = createAdminClient()
val errorMsg = "It is invalid to set `remote.log.delete.on.disable` or `remote.log.copy.disable` under Zookeeper's mode."
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
AlterConfigOp.OpType.SET),
))
assertThrowsException(classOf[InvalidConfigurationException],
() => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)
configs.clear()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"),
AlterConfigOp.OpType.SET),
))
assertThrowsException(classOf[InvalidConfigurationException],
() => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testTopicDeletion(quorum: String): Unit = {
@ -409,10 +439,15 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.getProperty(TopicConfig.RETENTION_BYTES_CONFIG).toLong ==
logBuffer.head.config.retentionSize
}
if (topicConfig.contains(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG)) {
if (topicConfig.contains(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG)) {
result = result &&
topicConfig.getProperty(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG).equals(
logBuffer.head.config.remoteLogDisablePolicy())
topicConfig.getProperty(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG).toBoolean ==
logBuffer.head.config.remoteLogCopyDisable()
}
if (topicConfig.contains(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG)) {
result = result &&
topicConfig.getProperty(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG).toBoolean ==
logBuffer.head.config.remoteLogDeleteOnDisable()
}
}
result

View File

@ -100,7 +100,8 @@ class LogConfigTest {
case TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-2")
case TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-1")
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1")
case TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0", "true")
case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
case _ => assertPropertyInvalid(name, "not_a_number", "-1")
})
@ -300,7 +301,7 @@ class LogConfigTest {
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString)
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString)
assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
}
@Test
@ -312,17 +313,17 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
}
@ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}")
@ -335,10 +336,10 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
if (sysRemoteStorageEnabled) {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
} else {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
}
}
@ -355,11 +356,20 @@ class LogConfigTest {
if (wasRemoteStorageEnabled) {
val message = assertThrows(classOf[InvalidConfigurationException],
() => LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains("Disabling remote storage feature on the topic level is not supported."))
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false))
assertTrue(message.getMessage.contains("It is invalid to disable remote storage without deleting remote data. " +
"If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " +
"If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))
// It should be able to disable the remote log storage when delete on disable is set to true
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
} else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps,
kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
}
}
@ -378,10 +388,12 @@ class LogConfigTest {
logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
} else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
}
}
@ -400,10 +412,12 @@ class LogConfigTest {
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
}
}
@ -426,19 +440,34 @@ class LogConfigTest {
}
@ParameterizedTest
@ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE))
def testValidRemoteLogDisablePolicy(policy: String): Unit = {
@ValueSource(booleans = Array(true, false))
def testValidRemoteLogCopyDisabled(copyDisabled: Boolean): Unit = {
val logProps = new Properties
logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy)
logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, copyDisabled.toString)
LogConfig.validate(logProps)
}
@ParameterizedTest
@ValueSource(strings = Array("keep", "remove"))
def testInvalidRemoteLogDisablePolicy(policy: String): Unit = {
@ValueSource(booleans = Array(true, false))
def testValidRemoteLogDeleteOnDisable(deleteOnDisable: Boolean): Unit = {
val logProps = new Properties
logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy)
assertThrows(classOf[ConfigException], () => LogConfig.validate(logProps))
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString)
LogConfig.validate(logProps)
}
@ParameterizedTest
@ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG))
def testInValidRemoteConfigsInZK(configKey: String): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val logProps = new Properties
logProps.put(configKey, "true")
val message = assertThrows(classOf[InvalidConfigurationException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, true, true))
assertTrue(message.getMessage.contains("It is invalid to set `remote.log.delete.on.disable` or " +
"`remote.log.copy.disable` under Zookeeper's mode."))
}
/* Verify that when the deprecated config LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG has non default value the new configs

View File

@ -797,7 +797,7 @@ class LogManagerTest {
val newProperties = new Properties()
newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
spyLogManager.updateTopicConfig(topic, newProperties, isRemoteLogStorageSystemEnabled = false)
spyLogManager.updateTopicConfig(topic, newProperties, isRemoteLogStorageSystemEnabled = false, wasRemoteLogEnabled = false, fromZK = false)
assertTrue(log0.config.delete)
assertTrue(log1.config.delete)

View File

@ -93,7 +93,9 @@ class ControllerConfigurationValidatorTest {
val config = new util.TreeMap[String, String]()
config.put(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
if (wasRemoteStorageEnabled) {
assertEquals("Disabling remote storage feature on the topic level is not supported.",
assertEquals("It is invalid to disable remote storage without deleting remote data. " +
"If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " +
"If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.",
assertThrows(classOf[InvalidConfigurationException], () => validator.validate(
new ConfigResource(TOPIC, "foo"), config, util.Collections.singletonMap(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"))).getMessage)
} else {

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1
import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, ServerLogConfigs, ZooKeeperInternals}
import org.apache.kafka.storage.internals.log.LogConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
@ -659,6 +660,7 @@ class DynamicConfigChangeUnitTest {
when(log0.remoteLogEnabled()).thenReturn(true)
when(partition0.isLeader).thenReturn(true)
when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition0))
when(log0.config).thenReturn(new LogConfig(Collections.emptyMap()))
val tp1 = new TopicPartition(topic, 1)
val log1: UnifiedLog = mock(classOf[UnifiedLog])
@ -667,6 +669,7 @@ class DynamicConfigChangeUnitTest {
when(log1.remoteLogEnabled()).thenReturn(true)
when(partition1.isLeader).thenReturn(false)
when(replicaManager.onlinePartition(tp1)).thenReturn(Some(partition1))
when(log1.config).thenReturn(new LogConfig(Collections.emptyMap()))
val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]])
val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]])
@ -674,7 +677,7 @@ class DynamicConfigChangeUnitTest {
val isRemoteLogEnabledBeforeUpdate = false
val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null, None)
configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0, log1), isRemoteLogEnabledBeforeUpdate)
configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0, log1), isRemoteLogEnabledBeforeUpdate, false)
assertEquals(Collections.singleton(partition0), leaderPartitionsArg.getValue)
assertEquals(Collections.singleton(partition1), followerPartitionsArg.getValue)
}
@ -682,17 +685,23 @@ class DynamicConfigChangeUnitTest {
@Test
def testEnableRemoteLogStorageOnTopicOnAlreadyEnabledTopic(): Unit = {
val topic = "test-topic"
val tp0 = new TopicPartition(topic, 0)
val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
val partition: Partition = mock(classOf[Partition])
when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition))
val log0: UnifiedLog = mock(classOf[UnifiedLog])
when(log0.remoteLogEnabled()).thenReturn(true)
doNothing().when(rlm).onLeadershipChange(any(), any(), any())
when(log0.config).thenReturn(new LogConfig(Collections.emptyMap()))
when(log0.topicPartition).thenReturn(tp0)
when(partition.isLeader).thenReturn(true)
val isRemoteLogEnabledBeforeUpdate = true
val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null, None)
configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0), isRemoteLogEnabledBeforeUpdate)
configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0), isRemoteLogEnabledBeforeUpdate, false)
verify(rlm, never()).onLeadershipChange(any(), any(), any())
}
}

View File

@ -113,13 +113,15 @@ public class LogConfig extends AbstractConfig {
public static class RemoteLogConfig {
public final boolean remoteStorageEnable;
public final String remoteLogDisablePolicy;
public final boolean remoteLogDeleteOnDisable;
public final boolean remoteLogCopyDisable;
public final long localRetentionMs;
public final long localRetentionBytes;
private RemoteLogConfig(LogConfig config) {
this.remoteStorageEnable = config.getBoolean(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
this.remoteLogDisablePolicy = config.getString(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG);
this.remoteLogCopyDisable = config.getBoolean(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG);
this.remoteLogDeleteOnDisable = config.getBoolean(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG);
this.localRetentionMs = config.getLong(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
this.localRetentionBytes = config.getLong(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
}
@ -128,7 +130,8 @@ public class LogConfig extends AbstractConfig {
public String toString() {
return "RemoteLogConfig{" +
"remoteStorageEnable=" + remoteStorageEnable +
", remoteLogDisablePolicy=" + remoteLogDisablePolicy +
", remoteLogCopyDisable=" + remoteLogCopyDisable +
", remoteLogDeleteOnDisable=" + remoteLogDeleteOnDisable +
", localRetentionMs=" + localRetentionMs +
", localRetentionBytes=" + localRetentionBytes +
'}';
@ -204,7 +207,8 @@ public class LogConfig extends AbstractConfig {
// Visible for testing
public static final Set<String> CONFIGS_WITH_NO_SERVER_DEFAULTS = Collections.unmodifiableSet(Utils.mkSet(
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG,
TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG,
QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG
));
@ -325,9 +329,8 @@ public class LogConfig extends AbstractConfig {
TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
.define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
.define(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, STRING, TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN,
in(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE),
MEDIUM, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DOC);
.define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC);
}
public final Set<String> overriddenConfigs;
@ -508,8 +511,12 @@ public class LogConfig extends AbstractConfig {
return remoteLogConfig.remoteStorageEnable;
}
public String remoteLogDisablePolicy() {
return remoteLogConfig.remoteLogDisablePolicy;
public Boolean remoteLogDeleteOnDisable() {
return remoteLogConfig.remoteLogDeleteOnDisable;
}
public Boolean remoteLogCopyDisable() {
return remoteLogConfig.remoteLogCopyDisable;
}
public long localRetentionMs() {
@ -613,11 +620,17 @@ public class LogConfig extends AbstractConfig {
* @param existingConfigs The existing properties
* @param newConfigs The new properties to be validated
* @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled
* @param fromZK true if this is a ZK cluster
*/
private static void validateTopicLogConfigValues(Map<String, String> existingConfigs,
Map<?, ?> newConfigs,
boolean isRemoteLogStorageSystemEnabled) {
boolean isRemoteLogStorageSystemEnabled,
boolean fromZK) {
validateValues(newConfigs);
if (fromZK) {
validateNoInvalidRemoteStorageConfigsInZK(newConfigs);
}
boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled) {
validateRemoteStorageOnlyIfSystemEnabled(newConfigs, isRemoteLogStorageSystemEnabled, false);
@ -626,14 +639,26 @@ public class LogConfig extends AbstractConfig {
validateRemoteStorageRetentionTime(newConfigs);
} else {
// The new config "remote.storage.enable" is false, validate if it's turning from true to false
validateNotTurningOffRemoteStorage(existingConfigs);
boolean wasRemoteLogEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"));
validateTurningOffRemoteStorageWithDelete(newConfigs, wasRemoteLogEnabled, isRemoteLogStorageEnabled);
}
}
public static void validateNotTurningOffRemoteStorage(Map<String, String> existingConfigs) {
boolean wasRemoteLogEnabledBeforeUpdate = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"));
if (wasRemoteLogEnabledBeforeUpdate) {
throw new InvalidConfigurationException("Disabling remote storage feature on the topic level is not supported.");
public static void validateTurningOffRemoteStorageWithDelete(Map<?, ?> newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) {
boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false);
if (wasRemoteLogEnabled && !isRemoteLogStorageEnabled && !isRemoteLogDeleteOnDisable) {
throw new InvalidConfigurationException("It is invalid to disable remote storage without deleting remote data. " +
"If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " +
"If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.");
}
}
public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> newConfigs) {
boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false);
boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false);
if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) {
throw new InvalidConfigurationException("It is invalid to set `remote.log.delete.on.disable` or " +
"`remote.log.copy.disable` under Zookeeper's mode.");
}
}
@ -694,13 +719,14 @@ public class LogConfig extends AbstractConfig {
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/
public static void validate(Properties props) {
validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
validate(Collections.emptyMap(), props, Collections.emptyMap(), false, false);
}
public static void validate(Map<String, String> existingConfigs,
Properties props,
Map<?, ?> configuredProps,
boolean isRemoteLogStorageSystemEnabled) {
boolean isRemoteLogStorageSystemEnabled,
boolean fromZK) {
validateNames(props);
if (configuredProps == null || configuredProps.isEmpty()) {
Map<?, ?> valueMaps = CONFIG.parse(props);
@ -709,7 +735,7 @@ public class LogConfig extends AbstractConfig {
Map<Object, Object> combinedConfigs = new HashMap<>(configuredProps);
combinedConfigs.putAll(props);
Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled);
validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, fromZK);
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness {
@Override
public int brokerCount() {
return 2;
}
@ParameterizedTest(name = "{displayName}.quorum={0}")
@ValueSource(strings = {"kraft"})
public void executeTieredStorageTest(String quorum) {
super.executeTieredStorageTest(quorum);
}
@Override
protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final Integer broker0 = 0;
final Integer broker1 = 1;
final String topicA = "topicA";
final Integer p0 = 0;
final Integer partitionCount = 1;
final Integer replicationFactor = 2;
final Integer maxBatchCountPerSegment = 1;
final boolean enableRemoteLogStorage = true;
final Map<Integer, List<Integer>> assignment = mkMap(
mkEntry(p0, Arrays.asList(broker0, broker1))
);
final Map<String, String> disableCopy = new HashMap<>();
disableCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true");
final Map<String, String> deleteOnDisable = new HashMap<>();
deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false");
deleteOnDisable.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true");
builder
.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, assignment,
enableRemoteLogStorage)
// send records to partition 0
.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// disable remote log copy
.updateTopicConfig(topicA,
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
Collections.emptyList())
// make sure we can still consume from the beginning of the topic to read data from local and remote storage
.expectFetchFromTieredStorage(broker0, topicA, p0, 2)
.consume(topicA, p0, 0L, 3, 2)
// re-enable remote log copy
.updateTopicConfig(topicA,
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false"),
Collections.emptyList())
// make sure the logs can be offloaded
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
.produce(topicA, p0, new KeyValueSpec("k3", "v3"))
// explicitly disable remote log copy
.updateTopicConfig(topicA,
disableCopy,
Collections.emptyList())
// make sure we can still consume from the beginning of the topic to read data from local and remote storage
.expectFetchFromTieredStorage(broker0, topicA, p0, 3)
.consume(topicA, p0, 0L, 4, 3)
// verify the remote retention policy is working.
// Use DELETE_RECORDS API to delete the records upto offset 1 and expect 1 remote segment to be deleted
.expectDeletionInRemoteStorage(broker0, topicA, p0, DELETE_SEGMENT, 1)
.deleteRecords(topicA, p0, 1L)
.waitForRemoteLogSegmentDeletion(topicA)
// disabling remote log on topicA and enabling deleteOnDisable
.updateTopicConfig(topicA,
deleteOnDisable,
Collections.emptyList())
// make sure all remote data is deleted
.expectEmptyRemoteStorage(topicA, p0)
// verify the local log is still consumable
.consume(topicA, p0, 3L, 1, 0);
}
}