diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 3689227d1fa..04c6c487cd0 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -93,17 +93,14 @@ public class TopicConfig { "deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " + "less than or equal to `retention.bytes` value."; - public static final String REMOTE_LOG_DISABLE_POLICY_RETAIN = "retain"; - public static final String REMOTE_LOG_DISABLE_POLICY_DELETE = "delete"; + public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = "remote.log.copy.disable"; + public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines whether tiered data for a topic should become read only," + + " and no more data uploading on a topic."; - public static final String REMOTE_LOG_DISABLE_POLICY_CONFIG = "remote.log.disable.policy"; - - public static final String REMOTE_LOG_DISABLE_POLICY_DOC = String.format("Determines whether tiered data for a topic should be retained or " + - "deleted after tiered storage disablement on a topic. The two valid options are \"%s\" and \"%s\". If %s is " + - "selected then all data in remote will be kept post-disablement and will only be deleted when it breaches expiration " + - "thresholds. If %s is selected then the data will be made inaccessible immediately by advancing the log start offset and will be " + - "deleted asynchronously.", REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE, - REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE); + public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable"; + public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " + + "deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " + + "set `remote.storage.enable` from true to false"; public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes"; public static final String MAX_MESSAGE_BYTES_DOC = diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index f3f0ccc11ad..f53f5a35623 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -437,30 +437,48 @@ public class RemoteLogManager implements Closeable { throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled"); } - Set leaderPartitions = filterPartitions(partitionsBecomeLeader) - .map(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition())).collect(Collectors.toSet()); + Map leaderPartitions = filterPartitions(partitionsBecomeLeader) + .collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()), + p -> p.log().exists(log -> log.config().remoteLogCopyDisable()))); - Set followerPartitions = filterPartitions(partitionsBecomeFollower) - .map(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition())).collect(Collectors.toSet()); + Map followerPartitions = filterPartitions(partitionsBecomeFollower) + .collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()), + p -> p.log().exists(log -> log.config().remoteLogCopyDisable()))); if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) { LOGGER.debug("Effective topic partitions after filtering compact and internal topics, leaders: {} and followers: {}", leaderPartitions, followerPartitions); - leaderPartitions.forEach(this::cacheTopicPartitionIds); - followerPartitions.forEach(this::cacheTopicPartitionIds); + leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp)); + followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp)); - remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, followerPartitions); - followerPartitions.forEach(this::doHandleFollowerPartition); + remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(), followerPartitions.keySet()); + followerPartitions.forEach((tp, __) -> doHandleFollowerPartition(tp)); // If this node was the previous leader for the partition, then the RLMTask might be running in the // background thread and might emit metrics. So, removing the metrics after marking this node as follower. - followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics); + followerPartitions.forEach((tp, __) -> removeRemoteTopicPartitionMetrics(tp)); leaderPartitions.forEach(this::doHandleLeaderPartition); } } + public void stopLeaderCopyRLMTasks(Set partitions) { + for (Partition partition : partitions) { + TopicPartition tp = partition.topicPartition(); + if (topicIdByPartitionMap.containsKey(tp)) { + TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp); + leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> { + LOGGER.info("Cancelling the copy RLM task for tpId: {}", tpId); + task.cancel(); + LOGGER.info("Resetting remote copy lag metrics for tpId: {}", tpId); + ((RLMCopyTask) task.rlmTask).resetLagStats(); + return null; + }); + } + } + } + /** * Stop the remote-log-manager task for the given partitions. And, calls the * {@link RemoteLogMetadataManager#onStopPartitions(Set)} when {@link StopPartition#deleteLocalLog()} is true. @@ -507,16 +525,18 @@ public class RemoteLogManager implements Closeable { LOGGER.error("Error while stopping the partition: {}", stopPartition, ex); } } - // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is true but not the other way around. - Set deleteLocalPartitions = stopPartitions.stream() - .filter(sp -> sp.deleteLocalLog() && topicIdByPartitionMap.containsKey(sp.topicPartition())) + + // We want to remote topicId map and stopPartition on RLMM for deleteLocalLog or stopRLMM partitions because + // in both case, they all mean the topic will not be held in this broker anymore. + // NOTE: In ZK mode, this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted + Set pendingActionsPartitions = stopPartitions.stream() + .filter(sp -> (sp.stopRemoteLogMetadataManager() || sp.deleteLocalLog()) && topicIdByPartitionMap.containsKey(sp.topicPartition())) .map(sp -> new TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), sp.topicPartition())) .collect(Collectors.toSet()); - if (!deleteLocalPartitions.isEmpty()) { - // NOTE: In ZK mode, this#stopPartitions method is called when Replica state changes to Offline and - // ReplicaDeletionStarted - remoteLogMetadataManager.onStopPartitions(deleteLocalPartitions); - deleteLocalPartitions.forEach(tpId -> topicIdByPartitionMap.remove(tpId.topicPartition())); + + if (!pendingActionsPartitions.isEmpty()) { + pendingActionsPartitions.forEach(tpId -> topicIdByPartitionMap.remove(tpId.topicPartition())); + remoteLogMetadataManager.onStopPartitions(pendingActionsPartitions); } } @@ -986,6 +1006,13 @@ public class RemoteLogManager implements Closeable { } } + void resetLagStats() { + String topic = topicIdPartition.topic(); + int partition = topicIdPartition.partition(); + brokerTopicStats.recordRemoteCopyLagBytes(topic, partition, 0); + brokerTopicStats.recordRemoteCopyLagSegments(topic, partition, 0); + } + private Path toPathIfExists(File file) { return file.exists() ? file.toPath() : null; } @@ -1794,20 +1821,23 @@ public class RemoteLogManager implements Closeable { new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer)); } - void doHandleLeaderPartition(TopicIdPartition topicPartition) { + void doHandleLeaderPartition(TopicIdPartition topicPartition, Boolean remoteLogCopyDisable) { RLMTaskWithFuture followerRLMTaskWithFuture = followerRLMTasks.remove(topicPartition); if (followerRLMTaskWithFuture != null) { LOGGER.info("Cancelling the follower task: {}", followerRLMTaskWithFuture.rlmTask); followerRLMTaskWithFuture.cancel(); } - leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> { - RLMCopyTask task = new RLMCopyTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes()); - // set this upfront when it is getting initialized instead of doing it after scheduling. - LOGGER.info("Created a new copy task: {} and getting scheduled", task); - ScheduledFuture future = rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, TimeUnit.MILLISECONDS); - return new RLMTaskWithFuture(task, future); - }); + // Only create copy task when remoteLogCopyDisable is disabled + if (!remoteLogCopyDisable) { + leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> { + RLMCopyTask task = new RLMCopyTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes()); + // set this upfront when it is getting initialized instead of doing it after scheduling. + LOGGER.info("Created a new copy task: {} and getting scheduled", task); + ScheduledFuture future = rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, TimeUnit.MILLISECONDS); + return new RLMTaskWithFuture(task, future); + }); + } leaderExpirationRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> { RLMExpirationTask task = new RLMExpirationTask(topicIdPartition); diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 95d2062bf89..66fedbfae6d 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -964,15 +964,23 @@ class LogManager(logDirs: Seq[File], */ def updateTopicConfig(topic: String, newTopicConfig: Properties, - isRemoteLogStorageSystemEnabled: Boolean): Unit = { + isRemoteLogStorageSystemEnabled: Boolean, + wasRemoteLogEnabled: Boolean, + fromZK: Boolean): Unit = { topicConfigUpdated(topic) val logs = logsByTopic(topic) // Combine the default properties with the overrides in zk to create the new LogConfig val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig) + val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable() // We would like to validate the configuration no matter whether the logs have materialised on disk or not. // Otherwise we risk someone creating a tiered-topic, disabling Tiered Storage cluster-wide and the check // failing since the logs for the topic are non-existent. LogConfig.validateRemoteStorageOnlyIfSystemEnabled(newLogConfig.values(), isRemoteLogStorageSystemEnabled, true) + // `remote.log.delete.on.disable` and `remote.log.copy.disable` are unsupported in ZK mode + if (fromZK) { + LogConfig.validateNoInvalidRemoteStorageConfigsInZK(newLogConfig.values()) + } + LogConfig.validateTurningOffRemoteStorageWithDelete(newLogConfig.values(), wasRemoteLogEnabled, isRemoteLogStorageEnabled) if (logs.nonEmpty) { logs.foreach { log => val oldLogConfig = log.updateConfig(newLogConfig) diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 2d478cfc30a..9f183b63ea6 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -26,14 +26,14 @@ import kafka.server.Constants._ import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.Implicits._ import kafka.utils.Logging -import org.apache.kafka.server.config.{ReplicationConfigs, QuotaConfigs, ZooKeeperInternals} +import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs, ZooKeeperInternals} import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.metrics.Quota._ import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.ClientMetricsManager -import org.apache.kafka.storage.internals.log.ThrottledReplicaListValidator +import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason, ThrottledReplicaListValidator} import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion import scala.annotation.nowarn @@ -68,25 +68,61 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, } val logs = logManager.logsByTopic(topic) - val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) + val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled()) + val wasCopyDisabled = logs.exists(_.config.remoteLogCopyDisable()) - logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) - maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) + // kafkaController is only defined in Zookeeper's mode + logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), + wasRemoteLogEnabled, kafkaController.isDefined) + maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, wasCopyDisabled) } - private[server] def maybeBootstrapRemoteLogComponents(topic: String, - logs: Seq[UnifiedLog], - wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = { + private[server] def maybeUpdateRemoteLogComponents(topic: String, + logs: Seq[UnifiedLog], + wasRemoteLogEnabled: Boolean, + wasCopyDisabled: Boolean): Unit = { val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled()) + val isCopyDisabled = logs.exists(_.config.remoteLogCopyDisable()) + val isDeleteOnDisable = logs.exists(_.config.remoteLogDeleteOnDisable()) + + val (leaderPartitions, followerPartitions) = + logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader) + // Topic configs gets updated incrementally. This check is added to prevent redundant updates. - if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) { - val (leaderPartitions, followerPartitions) = - logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader) + // When remote log is enabled, or remote copy is enabled, we should create RLM tasks accordingly via `onLeadershipChange`. + if (isRemoteLogEnabled && (!wasRemoteLogEnabled || (wasCopyDisabled && !isCopyDisabled))) { val topicIds = Collections.singletonMap(topic, replicaManager.metadataCache.getTopicId(topic)) replicaManager.remoteLogManager.foreach(rlm => rlm.onLeadershipChange(leaderPartitions.toSet.asJava, followerPartitions.toSet.asJava, topicIds)) - } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) { - warn(s"Disabling remote log on the topic: $topic is not supported.") + } + + // When copy disabled, we should stop leaderCopyRLMTask, but keep expirationTask + if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) { + replicaManager.remoteLogManager.foreach(rlm => { + rlm.stopLeaderCopyRLMTasks(leaderPartitions.toSet.asJava); + }) + } + + // Disabling remote log storage on this topic + if (wasRemoteLogEnabled && !isRemoteLogEnabled && isDeleteOnDisable) { + val stopPartitions: java.util.HashSet[StopPartition] = new java.util.HashSet[StopPartition]() + leaderPartitions.foreach(partition => { + // delete remote logs and stop RemoteLogMetadataManager + stopPartitions.add(StopPartition(partition.topicPartition, deleteLocalLog = false, + deleteRemoteLog = true, stopRemoteLogMetadataManager = true)) + }) + + followerPartitions.foreach(partition => { + // we need to cancel follower tasks and stop RemoteLogMetadataManager + stopPartitions.add(StopPartition(partition.topicPartition, deleteLocalLog = false, + deleteRemoteLog = false, stopRemoteLogMetadataManager = true)) + }) + + // update the log start offset to local log start offset for the leader replicas + logs.filter(log => leaderPartitions.exists(p => p.topicPartition.equals(log.topicPartition))) + .foreach(log => log.maybeIncrementLogStartOffset(log.localLogStartOffset(), LogStartOffsetIncrementReason.SegmentDeletion)) + + replicaManager.remoteLogManager.foreach(rlm => rlm.stopPartitions(stopPartitions, (_, _) => {})) } } diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index 06a60e30076..49083e3a0cd 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -109,7 +109,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu nullTopicConfigs.mkString(",")) } LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, - kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6ebd151a032..11f2e4ff03c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -99,7 +99,10 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc } } -case class StopPartition(topicPartition: TopicPartition, deleteLocalLog: Boolean, deleteRemoteLog: Boolean = false) +case class StopPartition(topicPartition: TopicPartition, + deleteLocalLog: Boolean, + deleteRemoteLog: Boolean = false, + stopRemoteLogMetadataManager: Boolean = false) /** * Result metadata of a log read operation on the log diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 8db20583e24..15c95b998b6 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -163,7 +163,8 @@ class AdminZkClient(zkClient: KafkaZkClient, LogConfig.validate(Collections.emptyMap(), config, kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), - kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()), + true) } private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, ReplicaAssignment], @@ -481,7 +482,7 @@ class AdminZkClient(zkClient: KafkaZkClient, // remove the topic overrides LogConfig.validate(Collections.emptyMap(), configs, kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), - kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()), true) } /** diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 4e5f70f9337..f27e39c1f1d 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -1309,12 +1309,12 @@ public class RemoteLogManagerTest { verifyInCache(followerTopicIdPartition, leaderTopicIdPartition); // Evicts from topicId cache - remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> { }); + remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true)), (tp, ex) -> { }); verifyNotInCache(leaderTopicIdPartition); verifyInCache(followerTopicIdPartition); // Evicts from topicId cache - remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(followerTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> { }); + remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(followerTopicIdPartition.topicPartition(), true, true, true)), (tp, ex) -> { }); verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition); } @@ -1344,7 +1344,7 @@ public class RemoteLogManagerTest { spyRemoteLogManager.onLeadershipChange( Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); - verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition)); + verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition), eq(false)); } private MemoryRecords records(long timestamp, @@ -1837,8 +1837,8 @@ public class RemoteLogManagerTest { remoteLogManager.startup(); BiConsumer errorHandler = (topicPartition, throwable) -> fail("shouldn't be called"); Set partitions = new HashSet<>(); - partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, false)); - partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, false)); + partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, false, false)); + partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, false, false)); remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds); assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition)); @@ -1860,8 +1860,8 @@ public class RemoteLogManagerTest { BiConsumer errorHandler = (topicPartition, ex) -> fail("shouldn't be called: " + ex); Set partitions = new HashSet<>(); - partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true)); - partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true)); + partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true)); + partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true, true)); remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds); assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition)); @@ -3208,6 +3208,7 @@ public class RemoteLogManagerTest { when(partition.topic()).thenReturn(tp.topic()); when(log.remoteLogEnabled()).thenReturn(true); when(partition.log()).thenReturn(Option.apply(log)); + when(log.config()).thenReturn(new LogConfig(new Properties())); return partition; } diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala index 462698734a0..25f366bfc94 100644 --- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala +++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala @@ -156,12 +156,13 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig = topicConfig)) } + // `remote.log.delete.on.disable` and `remote.log.copy.disable` only works in KRaft mode. @ParameterizedTest - @CsvSource(Array("zk,retain", "zk,delete", "kraft,retain", "kraft,delete")) - def testCreateRemoteTopicWithDisablePolicyRetain(quorum: String, policy: String): Unit = { + @CsvSource(Array("kraft,true,true", "kraft,true,false", "kraft,false,true", "kraft,false,false")) + def testCreateRemoteTopicWithCopyDisabledAndDeleteOnDisable(quorum: String, copyDisabled: Boolean, deleteOnDisable: Boolean): Unit = { val topicConfig = new Properties() - topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - topicConfig.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy) + topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, copyDisabled.toString) + topicConfig.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString) TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) verifyRemoteLogTopicConfigs(topicConfig) @@ -311,6 +312,35 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling remote storage feature on the topic level is not supported.") } + @ParameterizedTest + @ValueSource(strings = Array("zk")) + def testUpdateInvalidRemoteStorageConfigUnderZK(quorum: String): Unit = { + val admin = createAdminClient() + val errorMsg = "It is invalid to set `remote.log.delete.on.disable` or `remote.log.copy.disable` under Zookeeper's mode." + val topicConfig = new Properties + topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"), + AlterConfigOp.OpType.SET), + )) + assertThrowsException(classOf[InvalidConfigurationException], + () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg) + + configs.clear() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"), + AlterConfigOp.OpType.SET), + )) + assertThrowsException(classOf[InvalidConfigurationException], + () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg) + } + @ParameterizedTest @ValueSource(strings = Array("zk", "kraft")) def testTopicDeletion(quorum: String): Unit = { @@ -409,10 +439,15 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.getProperty(TopicConfig.RETENTION_BYTES_CONFIG).toLong == logBuffer.head.config.retentionSize } - if (topicConfig.contains(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG)) { + if (topicConfig.contains(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG)) { result = result && - topicConfig.getProperty(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG).equals( - logBuffer.head.config.remoteLogDisablePolicy()) + topicConfig.getProperty(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG).toBoolean == + logBuffer.head.config.remoteLogCopyDisable() + } + if (topicConfig.contains(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG)) { + result = result && + topicConfig.getProperty(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG).toBoolean == + logBuffer.head.config.remoteLogDeleteOnDisable() } } result diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 4e7e4e23b38..8126bb08b07 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -100,7 +100,8 @@ class LogConfigTest { case TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-2") case TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-1") case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1") - case TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0", "true") + case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0") + case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0") case _ => assertPropertyInvalid(name, "not_a_number", "-1") }) @@ -300,7 +301,7 @@ class LogConfigTest { props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString) props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString) assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) } @Test @@ -312,17 +313,17 @@ class LogConfigTest { val logProps = new Properties() logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact") assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete") assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) } @ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}") @@ -335,10 +336,10 @@ class LogConfigTest { val logProps = new Properties() logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") if (sysRemoteStorageEnabled) { - LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) } else { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker")) } } @@ -355,11 +356,20 @@ class LogConfigTest { if (wasRemoteStorageEnabled) { val message = assertThrows(classOf[InvalidConfigurationException], () => LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), - logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) - assertTrue(message.getMessage.contains("Disabling remote storage feature on the topic level is not supported.")) + logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)) + assertTrue(message.getMessage.contains("It is invalid to disable remote storage without deleting remote data. " + + "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " + + "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.")) + + + // It should be able to disable the remote log storage when delete on disable is set to true + logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true") + LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), + logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) } else { - LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) - LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) + LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps, + kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) } } @@ -378,10 +388,12 @@ class LogConfigTest { logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500") if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) } else { - LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) } } @@ -400,10 +412,12 @@ class LogConfigTest { logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128") if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) } else { - LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) } } @@ -426,19 +440,34 @@ class LogConfigTest { } @ParameterizedTest - @ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE)) - def testValidRemoteLogDisablePolicy(policy: String): Unit = { + @ValueSource(booleans = Array(true, false)) + def testValidRemoteLogCopyDisabled(copyDisabled: Boolean): Unit = { val logProps = new Properties - logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy) + logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, copyDisabled.toString) LogConfig.validate(logProps) } @ParameterizedTest - @ValueSource(strings = Array("keep", "remove")) - def testInvalidRemoteLogDisablePolicy(policy: String): Unit = { + @ValueSource(booleans = Array(true, false)) + def testValidRemoteLogDeleteOnDisable(deleteOnDisable: Boolean): Unit = { val logProps = new Properties - logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy) - assertThrows(classOf[ConfigException], () => LogConfig.validate(logProps)) + logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString) + LogConfig.validate(logProps) + } + + @ParameterizedTest + @ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG)) + def testInValidRemoteConfigsInZK(configKey: String): Unit = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + val logProps = new Properties + logProps.put(configKey, "true") + + val message = assertThrows(classOf[InvalidConfigurationException], + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, true, true)) + assertTrue(message.getMessage.contains("It is invalid to set `remote.log.delete.on.disable` or " + + "`remote.log.copy.disable` under Zookeeper's mode.")) } /* Verify that when the deprecated config LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG has non default value the new configs diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index b5a029a7411..ceb2342446f 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -797,7 +797,7 @@ class LogManagerTest { val newProperties = new Properties() newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) - spyLogManager.updateTopicConfig(topic, newProperties, isRemoteLogStorageSystemEnabled = false) + spyLogManager.updateTopicConfig(topic, newProperties, isRemoteLogStorageSystemEnabled = false, wasRemoteLogEnabled = false, fromZK = false) assertTrue(log0.config.delete) assertTrue(log1.config.delete) diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala index 250f07ca23e..4cf5ad70cee 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala @@ -93,7 +93,9 @@ class ControllerConfigurationValidatorTest { val config = new util.TreeMap[String, String]() config.put(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false") if (wasRemoteStorageEnabled) { - assertEquals("Disabling remote storage feature on the topic level is not supported.", + assertEquals("It is invalid to disable remote storage without deleting remote data. " + + "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " + + "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.", assertThrows(classOf[InvalidConfigurationException], () => validator.validate( new ConfigResource(TOPIC, "foo"), config, util.Collections.singletonMap(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"))).getMessage) } else { diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index cc4623b1277..dcec4a75d96 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -38,6 +38,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1 import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, ServerLogConfigs, ZooKeeperInternals} +import org.apache.kafka.storage.internals.log.LogConfig import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{Test, Timeout} import org.junit.jupiter.params.ParameterizedTest @@ -659,6 +660,7 @@ class DynamicConfigChangeUnitTest { when(log0.remoteLogEnabled()).thenReturn(true) when(partition0.isLeader).thenReturn(true) when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition0)) + when(log0.config).thenReturn(new LogConfig(Collections.emptyMap())) val tp1 = new TopicPartition(topic, 1) val log1: UnifiedLog = mock(classOf[UnifiedLog]) @@ -667,6 +669,7 @@ class DynamicConfigChangeUnitTest { when(log1.remoteLogEnabled()).thenReturn(true) when(partition1.isLeader).thenReturn(false) when(replicaManager.onlinePartition(tp1)).thenReturn(Some(partition1)) + when(log1.config).thenReturn(new LogConfig(Collections.emptyMap())) val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) @@ -674,7 +677,7 @@ class DynamicConfigChangeUnitTest { val isRemoteLogEnabledBeforeUpdate = false val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null, None) - configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0, log1), isRemoteLogEnabledBeforeUpdate) + configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0, log1), isRemoteLogEnabledBeforeUpdate, false) assertEquals(Collections.singleton(partition0), leaderPartitionsArg.getValue) assertEquals(Collections.singleton(partition1), followerPartitionsArg.getValue) } @@ -682,17 +685,23 @@ class DynamicConfigChangeUnitTest { @Test def testEnableRemoteLogStorageOnTopicOnAlreadyEnabledTopic(): Unit = { val topic = "test-topic" + val tp0 = new TopicPartition(topic, 0) val rlm: RemoteLogManager = mock(classOf[RemoteLogManager]) val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + val partition: Partition = mock(classOf[Partition]) when(replicaManager.remoteLogManager).thenReturn(Some(rlm)) + when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition)) val log0: UnifiedLog = mock(classOf[UnifiedLog]) when(log0.remoteLogEnabled()).thenReturn(true) doNothing().when(rlm).onLeadershipChange(any(), any(), any()) + when(log0.config).thenReturn(new LogConfig(Collections.emptyMap())) + when(log0.topicPartition).thenReturn(tp0) + when(partition.isLeader).thenReturn(true) val isRemoteLogEnabledBeforeUpdate = true val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null, None) - configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0), isRemoteLogEnabledBeforeUpdate) + configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0), isRemoteLogEnabledBeforeUpdate, false) verify(rlm, never()).onLeadershipChange(any(), any(), any()) } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index d026ea3b4b1..a0ca52678ad 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -113,13 +113,15 @@ public class LogConfig extends AbstractConfig { public static class RemoteLogConfig { public final boolean remoteStorageEnable; - public final String remoteLogDisablePolicy; + public final boolean remoteLogDeleteOnDisable; + public final boolean remoteLogCopyDisable; public final long localRetentionMs; public final long localRetentionBytes; private RemoteLogConfig(LogConfig config) { this.remoteStorageEnable = config.getBoolean(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - this.remoteLogDisablePolicy = config.getString(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG); + this.remoteLogCopyDisable = config.getBoolean(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG); + this.remoteLogDeleteOnDisable = config.getBoolean(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG); this.localRetentionMs = config.getLong(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG); this.localRetentionBytes = config.getLong(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); } @@ -128,7 +130,8 @@ public class LogConfig extends AbstractConfig { public String toString() { return "RemoteLogConfig{" + "remoteStorageEnable=" + remoteStorageEnable + - ", remoteLogDisablePolicy=" + remoteLogDisablePolicy + + ", remoteLogCopyDisable=" + remoteLogCopyDisable + + ", remoteLogDeleteOnDisable=" + remoteLogDeleteOnDisable + ", localRetentionMs=" + localRetentionMs + ", localRetentionBytes=" + localRetentionBytes + '}'; @@ -204,7 +207,8 @@ public class LogConfig extends AbstractConfig { // Visible for testing public static final Set CONFIGS_WITH_NO_SERVER_DEFAULTS = Collections.unmodifiableSet(Utils.mkSet( TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, - TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, + TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, + TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG )); @@ -325,9 +329,8 @@ public class LogConfig extends AbstractConfig { TopicConfig.LOCAL_LOG_RETENTION_MS_DOC) .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM, TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC) - .define(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, STRING, TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, - in(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE), - MEDIUM, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DOC); + .define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC) + .define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC); } public final Set overriddenConfigs; @@ -508,8 +511,12 @@ public class LogConfig extends AbstractConfig { return remoteLogConfig.remoteStorageEnable; } - public String remoteLogDisablePolicy() { - return remoteLogConfig.remoteLogDisablePolicy; + public Boolean remoteLogDeleteOnDisable() { + return remoteLogConfig.remoteLogDeleteOnDisable; + } + + public Boolean remoteLogCopyDisable() { + return remoteLogConfig.remoteLogCopyDisable; } public long localRetentionMs() { @@ -613,11 +620,17 @@ public class LogConfig extends AbstractConfig { * @param existingConfigs The existing properties * @param newConfigs The new properties to be validated * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled + * @param fromZK true if this is a ZK cluster */ private static void validateTopicLogConfigValues(Map existingConfigs, Map newConfigs, - boolean isRemoteLogStorageSystemEnabled) { + boolean isRemoteLogStorageSystemEnabled, + boolean fromZK) { validateValues(newConfigs); + + if (fromZK) { + validateNoInvalidRemoteStorageConfigsInZK(newConfigs); + } boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); if (isRemoteLogStorageEnabled) { validateRemoteStorageOnlyIfSystemEnabled(newConfigs, isRemoteLogStorageSystemEnabled, false); @@ -626,14 +639,26 @@ public class LogConfig extends AbstractConfig { validateRemoteStorageRetentionTime(newConfigs); } else { // The new config "remote.storage.enable" is false, validate if it's turning from true to false - validateNotTurningOffRemoteStorage(existingConfigs); + boolean wasRemoteLogEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); + validateTurningOffRemoteStorageWithDelete(newConfigs, wasRemoteLogEnabled, isRemoteLogStorageEnabled); } } - public static void validateNotTurningOffRemoteStorage(Map existingConfigs) { - boolean wasRemoteLogEnabledBeforeUpdate = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); - if (wasRemoteLogEnabledBeforeUpdate) { - throw new InvalidConfigurationException("Disabling remote storage feature on the topic level is not supported."); + public static void validateTurningOffRemoteStorageWithDelete(Map newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) { + boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false); + if (wasRemoteLogEnabled && !isRemoteLogStorageEnabled && !isRemoteLogDeleteOnDisable) { + throw new InvalidConfigurationException("It is invalid to disable remote storage without deleting remote data. " + + "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " + + "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."); + } + } + + public static void validateNoInvalidRemoteStorageConfigsInZK(Map newConfigs) { + boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false); + boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false); + if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) { + throw new InvalidConfigurationException("It is invalid to set `remote.log.delete.on.disable` or " + + "`remote.log.copy.disable` under Zookeeper's mode."); } } @@ -694,13 +719,14 @@ public class LogConfig extends AbstractConfig { * Check that the given properties contain only valid log config names and that all values can be parsed and are valid */ public static void validate(Properties props) { - validate(Collections.emptyMap(), props, Collections.emptyMap(), false); + validate(Collections.emptyMap(), props, Collections.emptyMap(), false, false); } public static void validate(Map existingConfigs, Properties props, Map configuredProps, - boolean isRemoteLogStorageSystemEnabled) { + boolean isRemoteLogStorageSystemEnabled, + boolean fromZK) { validateNames(props); if (configuredProps == null || configuredProps.isEmpty()) { Map valueMaps = CONFIG.parse(props); @@ -709,7 +735,7 @@ public class LogConfig extends AbstractConfig { Map combinedConfigs = new HashMap<>(configuredProps); combinedConfigs.putAll(props); Map valueMaps = CONFIG.parse(combinedConfigs); - validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled); + validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, fromZK); } } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java new file mode 100644 index 00000000000..6bcaea6a363 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; + +public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness { + + @Override + public int brokerCount() { + return 2; + } + + @ParameterizedTest(name = "{displayName}.quorum={0}") + @ValueSource(strings = {"kraft"}) + public void executeTieredStorageTest(String quorum) { + super.executeTieredStorageTest(quorum); + } + + @Override + protected void writeTestSpecifications(TieredStorageTestBuilder builder) { + final Integer broker0 = 0; + final Integer broker1 = 1; + final String topicA = "topicA"; + final Integer p0 = 0; + final Integer partitionCount = 1; + final Integer replicationFactor = 2; + final Integer maxBatchCountPerSegment = 1; + final boolean enableRemoteLogStorage = true; + final Map> assignment = mkMap( + mkEntry(p0, Arrays.asList(broker0, broker1)) + ); + final Map disableCopy = new HashMap<>(); + disableCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"); + + final Map deleteOnDisable = new HashMap<>(); + deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"); + deleteOnDisable.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"); + + builder + .createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, assignment, + enableRemoteLogStorage) + // send records to partition 0 + .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0")) + .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1")) + .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L) + .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), + new KeyValueSpec("k2", "v2")) + // disable remote log copy + .updateTopicConfig(topicA, + Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"), + Collections.emptyList()) + + // make sure we can still consume from the beginning of the topic to read data from local and remote storage + .expectFetchFromTieredStorage(broker0, topicA, p0, 2) + .consume(topicA, p0, 0L, 3, 2) + + // re-enable remote log copy + .updateTopicConfig(topicA, + Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false"), + Collections.emptyList()) + + // make sure the logs can be offloaded + .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L) + .produce(topicA, p0, new KeyValueSpec("k3", "v3")) + + // explicitly disable remote log copy + .updateTopicConfig(topicA, + disableCopy, + Collections.emptyList()) + // make sure we can still consume from the beginning of the topic to read data from local and remote storage + .expectFetchFromTieredStorage(broker0, topicA, p0, 3) + .consume(topicA, p0, 0L, 4, 3) + + // verify the remote retention policy is working. + // Use DELETE_RECORDS API to delete the records upto offset 1 and expect 1 remote segment to be deleted + .expectDeletionInRemoteStorage(broker0, topicA, p0, DELETE_SEGMENT, 1) + .deleteRecords(topicA, p0, 1L) + .waitForRemoteLogSegmentDeletion(topicA) + + // disabling remote log on topicA and enabling deleteOnDisable + .updateTopicConfig(topicA, + deleteOnDisable, + Collections.emptyList()) + // make sure all remote data is deleted + .expectEmptyRemoteStorage(topicA, p0) + // verify the local log is still consumable + .consume(topicA, p0, 3L, 1, 0); + } +}