mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-16855: remote log disable policy in KRaft (#16653)
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
		
							parent
							
								
									6e324487fa
								
							
						
					
					
						commit
						9f7e8d478a
					
				|  | @ -93,17 +93,14 @@ public class TopicConfig { | |||
|             "deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " + | ||||
|             "less than or equal to `retention.bytes` value."; | ||||
| 
 | ||||
|     public static final String REMOTE_LOG_DISABLE_POLICY_RETAIN = "retain"; | ||||
|     public static final String REMOTE_LOG_DISABLE_POLICY_DELETE = "delete"; | ||||
|     public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = "remote.log.copy.disable"; | ||||
|     public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines whether tiered data for a topic should become read only," + | ||||
|             " and no more data uploading on a topic."; | ||||
| 
 | ||||
|     public static final String REMOTE_LOG_DISABLE_POLICY_CONFIG = "remote.log.disable.policy"; | ||||
| 
 | ||||
|     public static final String REMOTE_LOG_DISABLE_POLICY_DOC = String.format("Determines whether tiered data for a topic should be retained or " + | ||||
|             "deleted after tiered storage disablement on a topic. The two valid options are \"%s\" and \"%s\". If %s is " + | ||||
|             "selected then all data in remote will be kept post-disablement and will only be deleted when it breaches expiration " + | ||||
|             "thresholds. If %s is selected then the data will be made inaccessible immediately by advancing the log start offset and will be " + | ||||
|             "deleted asynchronously.", REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE, | ||||
|             REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE); | ||||
|     public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable"; | ||||
|     public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " + | ||||
|             "deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " + | ||||
|             "set `remote.storage.enable` from true to false"; | ||||
| 
 | ||||
|     public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes"; | ||||
|     public static final String MAX_MESSAGE_BYTES_DOC = | ||||
|  |  | |||
|  | @ -437,30 +437,48 @@ public class RemoteLogManager implements Closeable { | |||
|             throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled"); | ||||
|         } | ||||
| 
 | ||||
|         Set<TopicIdPartition> leaderPartitions = filterPartitions(partitionsBecomeLeader) | ||||
|                 .map(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition())).collect(Collectors.toSet()); | ||||
|         Map<TopicIdPartition, Boolean> leaderPartitions = filterPartitions(partitionsBecomeLeader) | ||||
|                 .collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()), | ||||
|                         p -> p.log().exists(log -> log.config().remoteLogCopyDisable()))); | ||||
| 
 | ||||
|         Set<TopicIdPartition> followerPartitions = filterPartitions(partitionsBecomeFollower) | ||||
|                 .map(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition())).collect(Collectors.toSet()); | ||||
|         Map<TopicIdPartition, Boolean> followerPartitions = filterPartitions(partitionsBecomeFollower) | ||||
|                 .collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()), | ||||
|                         p -> p.log().exists(log -> log.config().remoteLogCopyDisable()))); | ||||
| 
 | ||||
|         if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) { | ||||
|             LOGGER.debug("Effective topic partitions after filtering compact and internal topics, leaders: {} and followers: {}", | ||||
|                     leaderPartitions, followerPartitions); | ||||
| 
 | ||||
|             leaderPartitions.forEach(this::cacheTopicPartitionIds); | ||||
|             followerPartitions.forEach(this::cacheTopicPartitionIds); | ||||
|             leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp)); | ||||
|             followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp)); | ||||
| 
 | ||||
|             remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, followerPartitions); | ||||
|             followerPartitions.forEach(this::doHandleFollowerPartition); | ||||
|             remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(), followerPartitions.keySet()); | ||||
|             followerPartitions.forEach((tp, __) -> doHandleFollowerPartition(tp)); | ||||
| 
 | ||||
|             // If this node was the previous leader for the partition, then the RLMTask might be running in the | ||||
|             // background thread and might emit metrics. So, removing the metrics after marking this node as follower. | ||||
|             followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics); | ||||
|             followerPartitions.forEach((tp, __) -> removeRemoteTopicPartitionMetrics(tp)); | ||||
| 
 | ||||
|             leaderPartitions.forEach(this::doHandleLeaderPartition); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public void stopLeaderCopyRLMTasks(Set<Partition> partitions) { | ||||
|         for (Partition partition : partitions) { | ||||
|             TopicPartition tp = partition.topicPartition(); | ||||
|             if (topicIdByPartitionMap.containsKey(tp)) { | ||||
|                 TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp); | ||||
|                 leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> { | ||||
|                     LOGGER.info("Cancelling the copy RLM task for tpId: {}", tpId); | ||||
|                     task.cancel(); | ||||
|                     LOGGER.info("Resetting remote copy lag metrics for tpId: {}", tpId); | ||||
|                     ((RLMCopyTask) task.rlmTask).resetLagStats(); | ||||
|                     return null; | ||||
|                 }); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * Stop the remote-log-manager task for the given partitions. And, calls the | ||||
|      * {@link RemoteLogMetadataManager#onStopPartitions(Set)} when {@link StopPartition#deleteLocalLog()} is true. | ||||
|  | @ -507,16 +525,18 @@ public class RemoteLogManager implements Closeable { | |||
|                 LOGGER.error("Error while stopping the partition: {}", stopPartition, ex); | ||||
|             } | ||||
|         } | ||||
|         // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is true but not the other way around. | ||||
|         Set<TopicIdPartition> deleteLocalPartitions = stopPartitions.stream() | ||||
|                 .filter(sp -> sp.deleteLocalLog() && topicIdByPartitionMap.containsKey(sp.topicPartition())) | ||||
| 
 | ||||
|         // We want to remote topicId map and stopPartition on RLMM for deleteLocalLog or stopRLMM partitions because | ||||
|         // in both case, they all mean the topic will not be held in this broker anymore. | ||||
|         // NOTE: In ZK mode, this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted | ||||
|         Set<TopicIdPartition> pendingActionsPartitions = stopPartitions.stream() | ||||
|                 .filter(sp -> (sp.stopRemoteLogMetadataManager() || sp.deleteLocalLog()) && topicIdByPartitionMap.containsKey(sp.topicPartition())) | ||||
|                 .map(sp -> new TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), sp.topicPartition())) | ||||
|                 .collect(Collectors.toSet()); | ||||
|         if (!deleteLocalPartitions.isEmpty()) { | ||||
|             // NOTE: In ZK mode, this#stopPartitions method is called when Replica state changes to Offline and | ||||
|             // ReplicaDeletionStarted | ||||
|             remoteLogMetadataManager.onStopPartitions(deleteLocalPartitions); | ||||
|             deleteLocalPartitions.forEach(tpId -> topicIdByPartitionMap.remove(tpId.topicPartition())); | ||||
| 
 | ||||
|         if (!pendingActionsPartitions.isEmpty()) { | ||||
|             pendingActionsPartitions.forEach(tpId -> topicIdByPartitionMap.remove(tpId.topicPartition())); | ||||
|             remoteLogMetadataManager.onStopPartitions(pendingActionsPartitions); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|  | @ -986,6 +1006,13 @@ public class RemoteLogManager implements Closeable { | |||
|             } | ||||
|         } | ||||
| 
 | ||||
|         void resetLagStats() { | ||||
|             String topic = topicIdPartition.topic(); | ||||
|             int partition = topicIdPartition.partition(); | ||||
|             brokerTopicStats.recordRemoteCopyLagBytes(topic, partition, 0); | ||||
|             brokerTopicStats.recordRemoteCopyLagSegments(topic, partition, 0); | ||||
|         } | ||||
| 
 | ||||
|         private Path toPathIfExists(File file) { | ||||
|             return file.exists() ? file.toPath() : null; | ||||
|         } | ||||
|  | @ -1794,13 +1821,15 @@ public class RemoteLogManager implements Closeable { | |||
|                 new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer)); | ||||
|     } | ||||
| 
 | ||||
|     void doHandleLeaderPartition(TopicIdPartition topicPartition) { | ||||
|     void doHandleLeaderPartition(TopicIdPartition topicPartition, Boolean remoteLogCopyDisable) { | ||||
|         RLMTaskWithFuture followerRLMTaskWithFuture = followerRLMTasks.remove(topicPartition); | ||||
|         if (followerRLMTaskWithFuture != null) { | ||||
|             LOGGER.info("Cancelling the follower task: {}", followerRLMTaskWithFuture.rlmTask); | ||||
|             followerRLMTaskWithFuture.cancel(); | ||||
|         } | ||||
| 
 | ||||
|         // Only create copy task when remoteLogCopyDisable is disabled | ||||
|         if (!remoteLogCopyDisable) { | ||||
|             leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> { | ||||
|                 RLMCopyTask task = new RLMCopyTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes()); | ||||
|                 // set this upfront when it is getting initialized instead of doing it after scheduling. | ||||
|  | @ -1808,6 +1837,7 @@ public class RemoteLogManager implements Closeable { | |||
|                 ScheduledFuture<?> future = rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, TimeUnit.MILLISECONDS); | ||||
|                 return new RLMTaskWithFuture(task, future); | ||||
|             }); | ||||
|         } | ||||
| 
 | ||||
|         leaderExpirationRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> { | ||||
|             RLMExpirationTask task = new RLMExpirationTask(topicIdPartition); | ||||
|  |  | |||
|  | @ -964,15 +964,23 @@ class LogManager(logDirs: Seq[File], | |||
|    */ | ||||
|   def updateTopicConfig(topic: String, | ||||
|                         newTopicConfig: Properties, | ||||
|                         isRemoteLogStorageSystemEnabled: Boolean): Unit = { | ||||
|                         isRemoteLogStorageSystemEnabled: Boolean, | ||||
|                         wasRemoteLogEnabled: Boolean, | ||||
|                         fromZK: Boolean): Unit = { | ||||
|     topicConfigUpdated(topic) | ||||
|     val logs = logsByTopic(topic) | ||||
|     // Combine the default properties with the overrides in zk to create the new LogConfig | ||||
|     val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig) | ||||
|     val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable() | ||||
|     // We would like to validate the configuration no matter whether the logs have materialised on disk or not. | ||||
|     // Otherwise we risk someone creating a tiered-topic, disabling Tiered Storage cluster-wide and the check | ||||
|     // failing since the logs for the topic are non-existent. | ||||
|     LogConfig.validateRemoteStorageOnlyIfSystemEnabled(newLogConfig.values(), isRemoteLogStorageSystemEnabled, true) | ||||
|     // `remote.log.delete.on.disable` and `remote.log.copy.disable` are unsupported in ZK mode | ||||
|     if (fromZK) { | ||||
|       LogConfig.validateNoInvalidRemoteStorageConfigsInZK(newLogConfig.values()) | ||||
|     } | ||||
|     LogConfig.validateTurningOffRemoteStorageWithDelete(newLogConfig.values(), wasRemoteLogEnabled, isRemoteLogStorageEnabled) | ||||
|     if (logs.nonEmpty) { | ||||
|       logs.foreach { log => | ||||
|         val oldLogConfig = log.updateConfig(newLogConfig) | ||||
|  |  | |||
|  | @ -26,14 +26,14 @@ import kafka.server.Constants._ | |||
| import kafka.server.QuotaFactory.QuotaManagers | ||||
| import kafka.utils.Implicits._ | ||||
| import kafka.utils.Logging | ||||
| import org.apache.kafka.server.config.{ReplicationConfigs, QuotaConfigs, ZooKeeperInternals} | ||||
| import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs, ZooKeeperInternals} | ||||
| import org.apache.kafka.common.config.TopicConfig | ||||
| import org.apache.kafka.common.metrics.Quota | ||||
| import org.apache.kafka.common.metrics.Quota._ | ||||
| import org.apache.kafka.common.utils.Sanitizer | ||||
| import org.apache.kafka.security.CredentialProvider | ||||
| import org.apache.kafka.server.ClientMetricsManager | ||||
| import org.apache.kafka.storage.internals.log.ThrottledReplicaListValidator | ||||
| import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason, ThrottledReplicaListValidator} | ||||
| import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion | ||||
| 
 | ||||
| import scala.annotation.nowarn | ||||
|  | @ -68,25 +68,61 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, | |||
|     } | ||||
| 
 | ||||
|     val logs = logManager.logsByTopic(topic) | ||||
|     val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) | ||||
|     val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled()) | ||||
|     val wasCopyDisabled = logs.exists(_.config.remoteLogCopyDisable()) | ||||
| 
 | ||||
|     logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) | ||||
|     maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) | ||||
|     // kafkaController is only defined in Zookeeper's mode | ||||
|     logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), | ||||
|       wasRemoteLogEnabled, kafkaController.isDefined) | ||||
|     maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, wasCopyDisabled) | ||||
|   } | ||||
| 
 | ||||
|   private[server] def maybeBootstrapRemoteLogComponents(topic: String, | ||||
|   private[server] def maybeUpdateRemoteLogComponents(topic: String, | ||||
|                                                      logs: Seq[UnifiedLog], | ||||
|                                                         wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = { | ||||
|                                                      wasRemoteLogEnabled: Boolean, | ||||
|                                                      wasCopyDisabled: Boolean): Unit = { | ||||
|     val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled()) | ||||
|     // Topic configs gets updated incrementally. This check is added to prevent redundant updates. | ||||
|     if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) { | ||||
|     val isCopyDisabled = logs.exists(_.config.remoteLogCopyDisable()) | ||||
|     val isDeleteOnDisable = logs.exists(_.config.remoteLogDeleteOnDisable()) | ||||
| 
 | ||||
|     val (leaderPartitions, followerPartitions) = | ||||
|       logs.flatMap(log => replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader) | ||||
| 
 | ||||
|     // Topic configs gets updated incrementally. This check is added to prevent redundant updates. | ||||
|     // When remote log is enabled, or remote copy is enabled, we should create RLM tasks accordingly via `onLeadershipChange`. | ||||
|     if (isRemoteLogEnabled && (!wasRemoteLogEnabled || (wasCopyDisabled && !isCopyDisabled))) { | ||||
|       val topicIds = Collections.singletonMap(topic, replicaManager.metadataCache.getTopicId(topic)) | ||||
|       replicaManager.remoteLogManager.foreach(rlm => | ||||
|         rlm.onLeadershipChange(leaderPartitions.toSet.asJava, followerPartitions.toSet.asJava, topicIds)) | ||||
|     } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) { | ||||
|       warn(s"Disabling remote log on the topic: $topic is not supported.") | ||||
|     } | ||||
| 
 | ||||
|     // When copy disabled, we should stop leaderCopyRLMTask, but keep expirationTask | ||||
|     if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) { | ||||
|       replicaManager.remoteLogManager.foreach(rlm => { | ||||
|         rlm.stopLeaderCopyRLMTasks(leaderPartitions.toSet.asJava); | ||||
|       }) | ||||
|     } | ||||
| 
 | ||||
|     // Disabling remote log storage on this topic | ||||
|     if (wasRemoteLogEnabled && !isRemoteLogEnabled && isDeleteOnDisable) { | ||||
|       val stopPartitions: java.util.HashSet[StopPartition] = new java.util.HashSet[StopPartition]() | ||||
|       leaderPartitions.foreach(partition => { | ||||
|         // delete remote logs and stop RemoteLogMetadataManager | ||||
|         stopPartitions.add(StopPartition(partition.topicPartition, deleteLocalLog = false, | ||||
|           deleteRemoteLog = true, stopRemoteLogMetadataManager = true)) | ||||
|       }) | ||||
| 
 | ||||
|       followerPartitions.foreach(partition => { | ||||
|         // we need to cancel follower tasks and stop RemoteLogMetadataManager | ||||
|         stopPartitions.add(StopPartition(partition.topicPartition, deleteLocalLog = false, | ||||
|           deleteRemoteLog = false, stopRemoteLogMetadataManager = true)) | ||||
|       }) | ||||
| 
 | ||||
|       // update the log start offset to local log start offset for the leader replicas | ||||
|       logs.filter(log => leaderPartitions.exists(p => p.topicPartition.equals(log.topicPartition))) | ||||
|         .foreach(log => log.maybeIncrementLogStartOffset(log.localLogStartOffset(), LogStartOffsetIncrementReason.SegmentDeletion)) | ||||
| 
 | ||||
|       replicaManager.remoteLogManager.foreach(rlm => rlm.stopPartitions(stopPartitions, (_, _) => {})) | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|  |  | |||
|  | @ -109,7 +109,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu | |||
|             nullTopicConfigs.mkString(",")) | ||||
|         } | ||||
|         LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, | ||||
|           kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) | ||||
|           kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) | ||||
|       case BROKER => validateBrokerName(resource.name()) | ||||
|       case CLIENT_METRICS => | ||||
|         val properties = new Properties() | ||||
|  |  | |||
|  | @ -99,7 +99,10 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc | |||
|   } | ||||
| } | ||||
| 
 | ||||
| case class StopPartition(topicPartition: TopicPartition, deleteLocalLog: Boolean, deleteRemoteLog: Boolean = false) | ||||
| case class StopPartition(topicPartition: TopicPartition, | ||||
|                          deleteLocalLog: Boolean, | ||||
|                          deleteRemoteLog: Boolean = false, | ||||
|                          stopRemoteLogMetadataManager: Boolean = false) | ||||
| 
 | ||||
| /** | ||||
|  * Result metadata of a log read operation on the log | ||||
|  |  | |||
|  | @ -163,7 +163,8 @@ class AdminZkClient(zkClient: KafkaZkClient, | |||
| 
 | ||||
|     LogConfig.validate(Collections.emptyMap(), config, | ||||
|       kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), | ||||
|       kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) | ||||
|       kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()), | ||||
|       true) | ||||
|   } | ||||
| 
 | ||||
|   private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, ReplicaAssignment], | ||||
|  | @ -481,7 +482,7 @@ class AdminZkClient(zkClient: KafkaZkClient, | |||
|     // remove the topic overrides | ||||
|     LogConfig.validate(Collections.emptyMap(), configs, | ||||
|       kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), | ||||
|       kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) | ||||
|       kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()), true) | ||||
|   } | ||||
| 
 | ||||
|   /** | ||||
|  |  | |||
|  | @ -1309,12 +1309,12 @@ public class RemoteLogManagerTest { | |||
|         verifyInCache(followerTopicIdPartition, leaderTopicIdPartition); | ||||
| 
 | ||||
|         // Evicts from topicId cache | ||||
|         remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> { }); | ||||
|         remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true)), (tp, ex) -> { }); | ||||
|         verifyNotInCache(leaderTopicIdPartition); | ||||
|         verifyInCache(followerTopicIdPartition); | ||||
| 
 | ||||
|         // Evicts from topicId cache | ||||
|         remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(followerTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> { }); | ||||
|         remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(followerTopicIdPartition.topicPartition(), true, true, true)), (tp, ex) -> { }); | ||||
|         verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition); | ||||
|     } | ||||
| 
 | ||||
|  | @ -1344,7 +1344,7 @@ public class RemoteLogManagerTest { | |||
| 
 | ||||
|         spyRemoteLogManager.onLeadershipChange( | ||||
|             Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); | ||||
|         verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition)); | ||||
|         verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition), eq(false)); | ||||
|     } | ||||
| 
 | ||||
|     private MemoryRecords records(long timestamp, | ||||
|  | @ -1837,8 +1837,8 @@ public class RemoteLogManagerTest { | |||
|         remoteLogManager.startup(); | ||||
|         BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition, throwable) -> fail("shouldn't be called"); | ||||
|         Set<StopPartition> partitions = new HashSet<>(); | ||||
|         partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, false)); | ||||
|         partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, false)); | ||||
|         partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, false, false)); | ||||
|         partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, false, false)); | ||||
|         remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)), | ||||
|                 Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds); | ||||
|         assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition)); | ||||
|  | @ -1860,8 +1860,8 @@ public class RemoteLogManagerTest { | |||
|         BiConsumer<TopicPartition, Throwable> errorHandler = | ||||
|                 (topicPartition, ex) -> fail("shouldn't be called: " + ex); | ||||
|         Set<StopPartition> partitions = new HashSet<>(); | ||||
|         partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true)); | ||||
|         partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true)); | ||||
|         partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true)); | ||||
|         partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true, true)); | ||||
|         remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)), | ||||
|                 Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds); | ||||
|         assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition)); | ||||
|  | @ -3208,6 +3208,7 @@ public class RemoteLogManagerTest { | |||
|         when(partition.topic()).thenReturn(tp.topic()); | ||||
|         when(log.remoteLogEnabled()).thenReturn(true); | ||||
|         when(partition.log()).thenReturn(Option.apply(log)); | ||||
|         when(log.config()).thenReturn(new LogConfig(new Properties())); | ||||
|         return partition; | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
|  | @ -156,12 +156,13 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { | |||
|         topicConfig = topicConfig)) | ||||
|   } | ||||
| 
 | ||||
|   // `remote.log.delete.on.disable` and `remote.log.copy.disable` only works in KRaft mode. | ||||
|   @ParameterizedTest | ||||
|   @CsvSource(Array("zk,retain", "zk,delete", "kraft,retain", "kraft,delete")) | ||||
|   def testCreateRemoteTopicWithDisablePolicyRetain(quorum: String, policy: String): Unit = { | ||||
|   @CsvSource(Array("kraft,true,true", "kraft,true,false", "kraft,false,true", "kraft,false,false")) | ||||
|   def testCreateRemoteTopicWithCopyDisabledAndDeleteOnDisable(quorum: String, copyDisabled: Boolean, deleteOnDisable: Boolean): Unit = { | ||||
|     val topicConfig = new Properties() | ||||
|     topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") | ||||
|     topicConfig.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy) | ||||
|     topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, copyDisabled.toString) | ||||
|     topicConfig.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString) | ||||
|     TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, | ||||
|       topicConfig = topicConfig) | ||||
|     verifyRemoteLogTopicConfigs(topicConfig) | ||||
|  | @ -311,6 +312,35 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { | |||
|       () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling remote storage feature on the topic level is not supported.") | ||||
|   } | ||||
| 
 | ||||
|   @ParameterizedTest | ||||
|   @ValueSource(strings = Array("zk")) | ||||
|   def testUpdateInvalidRemoteStorageConfigUnderZK(quorum: String): Unit = { | ||||
|     val admin = createAdminClient() | ||||
|     val errorMsg = "It is invalid to set `remote.log.delete.on.disable` or `remote.log.copy.disable` under Zookeeper's mode." | ||||
|     val topicConfig = new Properties | ||||
|     topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") | ||||
|     TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, | ||||
|       topicConfig = topicConfig) | ||||
| 
 | ||||
|     val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() | ||||
|     configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), | ||||
|       util.Arrays.asList( | ||||
|         new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"), | ||||
|           AlterConfigOp.OpType.SET), | ||||
|       )) | ||||
|     assertThrowsException(classOf[InvalidConfigurationException], | ||||
|       () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg) | ||||
| 
 | ||||
|     configs.clear() | ||||
|     configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), | ||||
|       util.Arrays.asList( | ||||
|         new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"), | ||||
|           AlterConfigOp.OpType.SET), | ||||
|       )) | ||||
|     assertThrowsException(classOf[InvalidConfigurationException], | ||||
|       () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg) | ||||
|   } | ||||
| 
 | ||||
|   @ParameterizedTest | ||||
|   @ValueSource(strings = Array("zk", "kraft")) | ||||
|   def testTopicDeletion(quorum: String): Unit = { | ||||
|  | @ -409,10 +439,15 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { | |||
|             topicConfig.getProperty(TopicConfig.RETENTION_BYTES_CONFIG).toLong == | ||||
|               logBuffer.head.config.retentionSize | ||||
|         } | ||||
|         if (topicConfig.contains(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG)) { | ||||
|         if (topicConfig.contains(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG)) { | ||||
|           result = result && | ||||
|             topicConfig.getProperty(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG).equals( | ||||
|               logBuffer.head.config.remoteLogDisablePolicy()) | ||||
|             topicConfig.getProperty(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG).toBoolean == | ||||
|               logBuffer.head.config.remoteLogCopyDisable() | ||||
|         } | ||||
|         if (topicConfig.contains(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG)) { | ||||
|           result = result && | ||||
|             topicConfig.getProperty(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG).toBoolean == | ||||
|               logBuffer.head.config.remoteLogDeleteOnDisable() | ||||
|         } | ||||
|       } | ||||
|       result | ||||
|  |  | |||
|  | @ -100,7 +100,8 @@ class LogConfigTest { | |||
|       case TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-2") | ||||
|       case TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-1") | ||||
|       case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1") | ||||
|       case TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0", "true") | ||||
|       case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0") | ||||
|       case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0") | ||||
| 
 | ||||
|       case _ => assertPropertyInvalid(name, "not_a_number", "-1") | ||||
|     }) | ||||
|  | @ -300,7 +301,7 @@ class LogConfigTest { | |||
|     props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString) | ||||
|     props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString) | ||||
|     assertThrows(classOf[ConfigException], | ||||
|       () => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) | ||||
|       () => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|  | @ -312,17 +313,17 @@ class LogConfigTest { | |||
|     val logProps = new Properties() | ||||
|     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) | ||||
|     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") | ||||
|     LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) | ||||
|     LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) | ||||
| 
 | ||||
|     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) | ||||
|     assertThrows(classOf[ConfigException], | ||||
|       () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) | ||||
|       () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) | ||||
|     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact") | ||||
|     assertThrows(classOf[ConfigException], | ||||
|       () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) | ||||
|       () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) | ||||
|     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete") | ||||
|     assertThrows(classOf[ConfigException], | ||||
|       () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) | ||||
|       () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) | ||||
|   } | ||||
| 
 | ||||
|   @ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}") | ||||
|  | @ -335,10 +336,10 @@ class LogConfigTest { | |||
|     val logProps = new Properties() | ||||
|     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") | ||||
|     if (sysRemoteStorageEnabled) { | ||||
|       LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) | ||||
|       LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) | ||||
|     } else { | ||||
|       val message = assertThrows(classOf[ConfigException], | ||||
|         () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) | ||||
|         () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) | ||||
|       assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker")) | ||||
|     } | ||||
|   } | ||||
|  | @ -355,11 +356,20 @@ class LogConfigTest { | |||
|     if (wasRemoteStorageEnabled) { | ||||
|       val message = assertThrows(classOf[InvalidConfigurationException], | ||||
|         () => LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), | ||||
|           logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) | ||||
|       assertTrue(message.getMessage.contains("Disabling remote storage feature on the topic level is not supported.")) | ||||
|           logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)) | ||||
|       assertTrue(message.getMessage.contains("It is invalid to disable remote storage without deleting remote data. " + | ||||
|         "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " + | ||||
|         "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.")) | ||||
| 
 | ||||
| 
 | ||||
|       // It should be able to disable the remote log storage when delete on disable is set to true | ||||
|       logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true") | ||||
|       LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), | ||||
|         logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) | ||||
|     } else { | ||||
|       LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) | ||||
|       LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) | ||||
|       LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) | ||||
|       LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps, | ||||
|         kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|  | @ -378,10 +388,12 @@ class LogConfigTest { | |||
|     logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500") | ||||
|     if (sysRemoteStorageEnabled) { | ||||
|       val message = assertThrows(classOf[ConfigException], | ||||
|         () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) | ||||
|         () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, | ||||
|           kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) | ||||
|       assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) | ||||
|     } else { | ||||
|       LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) | ||||
|       LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, | ||||
|         kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|  | @ -400,10 +412,12 @@ class LogConfigTest { | |||
|     logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128") | ||||
|     if (sysRemoteStorageEnabled) { | ||||
|       val message = assertThrows(classOf[ConfigException], | ||||
|         () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) | ||||
|         () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, | ||||
|           kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) | ||||
|       assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) | ||||
|     } else { | ||||
|       LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) | ||||
|       LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, | ||||
|         kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|  | @ -426,19 +440,34 @@ class LogConfigTest { | |||
|   } | ||||
| 
 | ||||
|   @ParameterizedTest | ||||
|   @ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE)) | ||||
|   def testValidRemoteLogDisablePolicy(policy: String): Unit = { | ||||
|   @ValueSource(booleans = Array(true, false)) | ||||
|   def testValidRemoteLogCopyDisabled(copyDisabled: Boolean): Unit = { | ||||
|     val logProps = new Properties | ||||
|     logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy) | ||||
|     logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, copyDisabled.toString) | ||||
|     LogConfig.validate(logProps) | ||||
|   } | ||||
| 
 | ||||
|   @ParameterizedTest | ||||
|   @ValueSource(strings = Array("keep", "remove")) | ||||
|   def testInvalidRemoteLogDisablePolicy(policy: String): Unit = { | ||||
|   @ValueSource(booleans = Array(true, false)) | ||||
|   def testValidRemoteLogDeleteOnDisable(deleteOnDisable: Boolean): Unit = { | ||||
|     val logProps = new Properties | ||||
|     logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy) | ||||
|     assertThrows(classOf[ConfigException], () => LogConfig.validate(logProps)) | ||||
|     logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString) | ||||
|     LogConfig.validate(logProps) | ||||
|   } | ||||
| 
 | ||||
|   @ParameterizedTest | ||||
|   @ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG)) | ||||
|   def testInValidRemoteConfigsInZK(configKey: String): Unit = { | ||||
|     val kafkaProps = TestUtils.createDummyBrokerConfig() | ||||
|     kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") | ||||
|     val kafkaConfig = KafkaConfig.fromProps(kafkaProps) | ||||
|     val logProps = new Properties | ||||
|     logProps.put(configKey, "true") | ||||
| 
 | ||||
|     val message = assertThrows(classOf[InvalidConfigurationException], | ||||
|       () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, true, true)) | ||||
|     assertTrue(message.getMessage.contains("It is invalid to set `remote.log.delete.on.disable` or " + | ||||
|       "`remote.log.copy.disable` under Zookeeper's mode.")) | ||||
|   } | ||||
| 
 | ||||
|   /* Verify that when the deprecated config LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG has non default value the new configs | ||||
|  |  | |||
|  | @ -797,7 +797,7 @@ class LogManagerTest { | |||
|     val newProperties = new Properties() | ||||
|     newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) | ||||
| 
 | ||||
|     spyLogManager.updateTopicConfig(topic, newProperties, isRemoteLogStorageSystemEnabled = false) | ||||
|     spyLogManager.updateTopicConfig(topic, newProperties, isRemoteLogStorageSystemEnabled = false, wasRemoteLogEnabled = false, fromZK = false) | ||||
| 
 | ||||
|     assertTrue(log0.config.delete) | ||||
|     assertTrue(log1.config.delete) | ||||
|  |  | |||
|  | @ -93,7 +93,9 @@ class ControllerConfigurationValidatorTest { | |||
|     val config = new util.TreeMap[String, String]() | ||||
|     config.put(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false") | ||||
|     if (wasRemoteStorageEnabled) { | ||||
|       assertEquals("Disabling remote storage feature on the topic level is not supported.", | ||||
|       assertEquals("It is invalid to disable remote storage without deleting remote data. " + | ||||
|         "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " + | ||||
|         "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.", | ||||
|         assertThrows(classOf[InvalidConfigurationException], () => validator.validate( | ||||
|           new ConfigResource(TOPIC, "foo"), config, util.Collections.singletonMap(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"))).getMessage) | ||||
|     } else { | ||||
|  |  | |||
|  | @ -38,6 +38,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal | |||
| import org.apache.kafka.common.{TopicPartition, Uuid} | ||||
| import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1 | ||||
| import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, ServerLogConfigs, ZooKeeperInternals} | ||||
| import org.apache.kafka.storage.internals.log.LogConfig | ||||
| import org.junit.jupiter.api.Assertions._ | ||||
| import org.junit.jupiter.api.{Test, Timeout} | ||||
| import org.junit.jupiter.params.ParameterizedTest | ||||
|  | @ -659,6 +660,7 @@ class DynamicConfigChangeUnitTest { | |||
|     when(log0.remoteLogEnabled()).thenReturn(true) | ||||
|     when(partition0.isLeader).thenReturn(true) | ||||
|     when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition0)) | ||||
|     when(log0.config).thenReturn(new LogConfig(Collections.emptyMap())) | ||||
| 
 | ||||
|     val tp1 = new TopicPartition(topic, 1) | ||||
|     val log1: UnifiedLog = mock(classOf[UnifiedLog]) | ||||
|  | @ -667,6 +669,7 @@ class DynamicConfigChangeUnitTest { | |||
|     when(log1.remoteLogEnabled()).thenReturn(true) | ||||
|     when(partition1.isLeader).thenReturn(false) | ||||
|     when(replicaManager.onlinePartition(tp1)).thenReturn(Some(partition1)) | ||||
|     when(log1.config).thenReturn(new LogConfig(Collections.emptyMap())) | ||||
| 
 | ||||
|     val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) | ||||
|     val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) | ||||
|  | @ -674,7 +677,7 @@ class DynamicConfigChangeUnitTest { | |||
| 
 | ||||
|     val isRemoteLogEnabledBeforeUpdate = false | ||||
|     val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null, None) | ||||
|     configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0, log1), isRemoteLogEnabledBeforeUpdate) | ||||
|     configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0, log1), isRemoteLogEnabledBeforeUpdate, false) | ||||
|     assertEquals(Collections.singleton(partition0), leaderPartitionsArg.getValue) | ||||
|     assertEquals(Collections.singleton(partition1), followerPartitionsArg.getValue) | ||||
|   } | ||||
|  | @ -682,17 +685,23 @@ class DynamicConfigChangeUnitTest { | |||
|   @Test | ||||
|   def testEnableRemoteLogStorageOnTopicOnAlreadyEnabledTopic(): Unit = { | ||||
|     val topic = "test-topic" | ||||
|     val tp0 = new TopicPartition(topic, 0) | ||||
|     val rlm: RemoteLogManager = mock(classOf[RemoteLogManager]) | ||||
|     val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) | ||||
|     val partition: Partition = mock(classOf[Partition]) | ||||
|     when(replicaManager.remoteLogManager).thenReturn(Some(rlm)) | ||||
|     when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition)) | ||||
| 
 | ||||
|     val log0: UnifiedLog = mock(classOf[UnifiedLog]) | ||||
|     when(log0.remoteLogEnabled()).thenReturn(true) | ||||
|     doNothing().when(rlm).onLeadershipChange(any(), any(), any()) | ||||
|     when(log0.config).thenReturn(new LogConfig(Collections.emptyMap())) | ||||
|     when(log0.topicPartition).thenReturn(tp0) | ||||
|     when(partition.isLeader).thenReturn(true) | ||||
| 
 | ||||
|     val isRemoteLogEnabledBeforeUpdate = true | ||||
|     val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null, None) | ||||
|     configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0), isRemoteLogEnabledBeforeUpdate) | ||||
|     configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0), isRemoteLogEnabledBeforeUpdate, false) | ||||
|     verify(rlm, never()).onLeadershipChange(any(), any(), any()) | ||||
|   } | ||||
| } | ||||
|  |  | |||
|  | @ -113,13 +113,15 @@ public class LogConfig extends AbstractConfig { | |||
|     public static class RemoteLogConfig { | ||||
| 
 | ||||
|         public final boolean remoteStorageEnable; | ||||
|         public final String remoteLogDisablePolicy; | ||||
|         public final boolean remoteLogDeleteOnDisable; | ||||
|         public final boolean remoteLogCopyDisable; | ||||
|         public final long localRetentionMs; | ||||
|         public final long localRetentionBytes; | ||||
| 
 | ||||
|         private RemoteLogConfig(LogConfig config) { | ||||
|             this.remoteStorageEnable = config.getBoolean(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); | ||||
|             this.remoteLogDisablePolicy = config.getString(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG); | ||||
|             this.remoteLogCopyDisable = config.getBoolean(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG); | ||||
|             this.remoteLogDeleteOnDisable = config.getBoolean(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG); | ||||
|             this.localRetentionMs = config.getLong(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG); | ||||
|             this.localRetentionBytes = config.getLong(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); | ||||
|         } | ||||
|  | @ -128,7 +130,8 @@ public class LogConfig extends AbstractConfig { | |||
|         public String toString() { | ||||
|             return "RemoteLogConfig{" + | ||||
|                     "remoteStorageEnable=" + remoteStorageEnable + | ||||
|                     ", remoteLogDisablePolicy=" + remoteLogDisablePolicy + | ||||
|                     ", remoteLogCopyDisable=" + remoteLogCopyDisable + | ||||
|                     ", remoteLogDeleteOnDisable=" + remoteLogDeleteOnDisable + | ||||
|                     ", localRetentionMs=" + localRetentionMs + | ||||
|                     ", localRetentionBytes=" + localRetentionBytes + | ||||
|                     '}'; | ||||
|  | @ -204,7 +207,8 @@ public class LogConfig extends AbstractConfig { | |||
|     // Visible for testing | ||||
|     public static final Set<String> CONFIGS_WITH_NO_SERVER_DEFAULTS = Collections.unmodifiableSet(Utils.mkSet( | ||||
|             TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, | ||||
|             TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, | ||||
|             TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, | ||||
|             TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, | ||||
|             QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, | ||||
|             QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG | ||||
|     )); | ||||
|  | @ -325,9 +329,8 @@ public class LogConfig extends AbstractConfig { | |||
|                         TopicConfig.LOCAL_LOG_RETENTION_MS_DOC) | ||||
|                 .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM, | ||||
|                         TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC) | ||||
|                 .define(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, STRING, TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, | ||||
|                         in(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE), | ||||
|                         MEDIUM, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DOC); | ||||
|                 .define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC) | ||||
|                 .define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC); | ||||
|     } | ||||
| 
 | ||||
|     public final Set<String> overriddenConfigs; | ||||
|  | @ -508,8 +511,12 @@ public class LogConfig extends AbstractConfig { | |||
|         return remoteLogConfig.remoteStorageEnable; | ||||
|     } | ||||
| 
 | ||||
|     public String remoteLogDisablePolicy() { | ||||
|         return remoteLogConfig.remoteLogDisablePolicy; | ||||
|     public Boolean remoteLogDeleteOnDisable() { | ||||
|         return remoteLogConfig.remoteLogDeleteOnDisable; | ||||
|     } | ||||
| 
 | ||||
|     public Boolean remoteLogCopyDisable() { | ||||
|         return remoteLogConfig.remoteLogCopyDisable; | ||||
|     } | ||||
| 
 | ||||
|     public long localRetentionMs() { | ||||
|  | @ -613,11 +620,17 @@ public class LogConfig extends AbstractConfig { | |||
|      * @param existingConfigs                   The existing properties | ||||
|      * @param newConfigs                        The new properties to be validated | ||||
|      * @param isRemoteLogStorageSystemEnabled   true if system wise remote log storage is enabled | ||||
|      * @param fromZK                            true if this is a ZK cluster | ||||
|      */ | ||||
|     private static void validateTopicLogConfigValues(Map<String, String> existingConfigs, | ||||
|                                                      Map<?, ?> newConfigs, | ||||
|                                                      boolean isRemoteLogStorageSystemEnabled) { | ||||
|                                                      boolean isRemoteLogStorageSystemEnabled, | ||||
|                                                      boolean fromZK) { | ||||
|         validateValues(newConfigs); | ||||
| 
 | ||||
|         if (fromZK) { | ||||
|             validateNoInvalidRemoteStorageConfigsInZK(newConfigs); | ||||
|         } | ||||
|         boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); | ||||
|         if (isRemoteLogStorageEnabled) { | ||||
|             validateRemoteStorageOnlyIfSystemEnabled(newConfigs, isRemoteLogStorageSystemEnabled, false); | ||||
|  | @ -626,14 +639,26 @@ public class LogConfig extends AbstractConfig { | |||
|             validateRemoteStorageRetentionTime(newConfigs); | ||||
|         } else { | ||||
|             // The new config "remote.storage.enable" is false, validate if it's turning from true to false | ||||
|             validateNotTurningOffRemoteStorage(existingConfigs); | ||||
|             boolean wasRemoteLogEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); | ||||
|             validateTurningOffRemoteStorageWithDelete(newConfigs, wasRemoteLogEnabled, isRemoteLogStorageEnabled); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public static void validateNotTurningOffRemoteStorage(Map<String, String> existingConfigs) { | ||||
|         boolean wasRemoteLogEnabledBeforeUpdate = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); | ||||
|         if (wasRemoteLogEnabledBeforeUpdate) { | ||||
|             throw new InvalidConfigurationException("Disabling remote storage feature on the topic level is not supported."); | ||||
|     public static void validateTurningOffRemoteStorageWithDelete(Map<?, ?> newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) { | ||||
|         boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false); | ||||
|         if (wasRemoteLogEnabled && !isRemoteLogStorageEnabled && !isRemoteLogDeleteOnDisable) { | ||||
|             throw new InvalidConfigurationException("It is invalid to disable remote storage without deleting remote data. " + | ||||
|                     "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " + | ||||
|                     "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> newConfigs) { | ||||
|         boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false); | ||||
|         boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false); | ||||
|         if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) { | ||||
|             throw new InvalidConfigurationException("It is invalid to set `remote.log.delete.on.disable` or " + | ||||
|                     "`remote.log.copy.disable` under Zookeeper's mode."); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|  | @ -694,13 +719,14 @@ public class LogConfig extends AbstractConfig { | |||
|      * Check that the given properties contain only valid log config names and that all values can be parsed and are valid | ||||
|      */ | ||||
|     public static void validate(Properties props) { | ||||
|         validate(Collections.emptyMap(), props, Collections.emptyMap(), false); | ||||
|         validate(Collections.emptyMap(), props, Collections.emptyMap(), false, false); | ||||
|     } | ||||
| 
 | ||||
|     public static void validate(Map<String, String> existingConfigs, | ||||
|                                 Properties props, | ||||
|                                 Map<?, ?> configuredProps, | ||||
|                                 boolean isRemoteLogStorageSystemEnabled) { | ||||
|                                 boolean isRemoteLogStorageSystemEnabled, | ||||
|                                 boolean fromZK) { | ||||
|         validateNames(props); | ||||
|         if (configuredProps == null || configuredProps.isEmpty()) { | ||||
|             Map<?, ?> valueMaps = CONFIG.parse(props); | ||||
|  | @ -709,7 +735,7 @@ public class LogConfig extends AbstractConfig { | |||
|             Map<Object, Object> combinedConfigs = new HashMap<>(configuredProps); | ||||
|             combinedConfigs.putAll(props); | ||||
|             Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs); | ||||
|             validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled); | ||||
|             validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, fromZK); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
|  | @ -0,0 +1,120 @@ | |||
| /* | ||||
|  * Licensed to the Apache Software Foundation (ASF) under one or more | ||||
|  * contributor license agreements. See the NOTICE file distributed with | ||||
|  * this work for additional information regarding copyright ownership. | ||||
|  * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||
|  * (the "License"); you may not use this file except in compliance with | ||||
|  * the License. You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| package org.apache.kafka.tiered.storage.integration; | ||||
| 
 | ||||
| import org.apache.kafka.common.config.TopicConfig; | ||||
| import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; | ||||
| import org.apache.kafka.tiered.storage.TieredStorageTestHarness; | ||||
| import org.apache.kafka.tiered.storage.specs.KeyValueSpec; | ||||
| 
 | ||||
| import org.junit.jupiter.params.ParameterizedTest; | ||||
| import org.junit.jupiter.params.provider.ValueSource; | ||||
| 
 | ||||
| import java.util.Arrays; | ||||
| import java.util.Collections; | ||||
| import java.util.HashMap; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| 
 | ||||
| import static org.apache.kafka.common.utils.Utils.mkEntry; | ||||
| import static org.apache.kafka.common.utils.Utils.mkMap; | ||||
| import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; | ||||
| 
 | ||||
| public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness { | ||||
| 
 | ||||
|     @Override | ||||
|     public int brokerCount() { | ||||
|         return 2; | ||||
|     } | ||||
| 
 | ||||
|     @ParameterizedTest(name = "{displayName}.quorum={0}") | ||||
|     @ValueSource(strings = {"kraft"}) | ||||
|     public void executeTieredStorageTest(String quorum) { | ||||
|         super.executeTieredStorageTest(quorum); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     protected void writeTestSpecifications(TieredStorageTestBuilder builder) { | ||||
|         final Integer broker0 = 0; | ||||
|         final Integer broker1 = 1; | ||||
|         final String topicA = "topicA"; | ||||
|         final Integer p0 = 0; | ||||
|         final Integer partitionCount = 1; | ||||
|         final Integer replicationFactor = 2; | ||||
|         final Integer maxBatchCountPerSegment = 1; | ||||
|         final boolean enableRemoteLogStorage = true; | ||||
|         final Map<Integer, List<Integer>> assignment = mkMap( | ||||
|                 mkEntry(p0, Arrays.asList(broker0, broker1)) | ||||
|         ); | ||||
|         final Map<String, String> disableCopy = new HashMap<>(); | ||||
|         disableCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"); | ||||
| 
 | ||||
|         final Map<String, String> deleteOnDisable = new HashMap<>(); | ||||
|         deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"); | ||||
|         deleteOnDisable.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"); | ||||
| 
 | ||||
|         builder | ||||
|                 .createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, assignment, | ||||
|                         enableRemoteLogStorage) | ||||
|                 // send records to partition 0 | ||||
|                 .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0")) | ||||
|                 .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1")) | ||||
|                 .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L) | ||||
|                 .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), | ||||
|                         new KeyValueSpec("k2", "v2")) | ||||
|                 // disable remote log copy | ||||
|                 .updateTopicConfig(topicA, | ||||
|                         Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"), | ||||
|                         Collections.emptyList()) | ||||
| 
 | ||||
|                 // make sure we can still consume from the beginning of the topic to read data from local and remote storage | ||||
|                 .expectFetchFromTieredStorage(broker0, topicA, p0, 2) | ||||
|                 .consume(topicA, p0, 0L, 3, 2) | ||||
| 
 | ||||
|                 // re-enable remote log copy | ||||
|                 .updateTopicConfig(topicA, | ||||
|                         Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false"), | ||||
|                         Collections.emptyList()) | ||||
| 
 | ||||
|                 // make sure the logs can be offloaded | ||||
|                 .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L) | ||||
|                 .produce(topicA, p0, new KeyValueSpec("k3", "v3")) | ||||
| 
 | ||||
|                 // explicitly disable remote log copy | ||||
|                 .updateTopicConfig(topicA, | ||||
|                         disableCopy, | ||||
|                         Collections.emptyList()) | ||||
|                 // make sure we can still consume from the beginning of the topic to read data from local and remote storage | ||||
|                 .expectFetchFromTieredStorage(broker0, topicA, p0, 3) | ||||
|                 .consume(topicA, p0, 0L, 4, 3) | ||||
| 
 | ||||
|                 // verify the remote retention policy is working. | ||||
|                 // Use DELETE_RECORDS API to delete the records upto offset 1 and expect 1 remote segment to be deleted | ||||
|                 .expectDeletionInRemoteStorage(broker0, topicA, p0, DELETE_SEGMENT, 1) | ||||
|                 .deleteRecords(topicA, p0, 1L) | ||||
|                 .waitForRemoteLogSegmentDeletion(topicA) | ||||
| 
 | ||||
|                 // disabling remote log on topicA and enabling deleteOnDisable | ||||
|                 .updateTopicConfig(topicA, | ||||
|                         deleteOnDisable, | ||||
|                         Collections.emptyList()) | ||||
|                 // make sure all remote data is deleted | ||||
|                 .expectEmptyRemoteStorage(topicA, p0) | ||||
|                 // verify the local log is still consumable | ||||
|                 .consume(topicA, p0, 3L, 1, 0); | ||||
|     } | ||||
| } | ||||
		Loading…
	
		Reference in New Issue