diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 04c6c487cd0..b3cea19c551 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -95,7 +95,9 @@ public class TopicConfig { public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = "remote.log.copy.disable"; public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines whether tiered data for a topic should become read only," + - " and no more data uploading on a topic."; + " and no more data uploading on a topic. Once this config is set to true, the local retention configuration " + + "(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" + + "(i.e. retention.ms/bytes)."; public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable"; public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " + diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 66fedbfae6d..7c04dee1f78 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -981,6 +981,7 @@ class LogManager(logDirs: Seq[File], LogConfig.validateNoInvalidRemoteStorageConfigsInZK(newLogConfig.values()) } LogConfig.validateTurningOffRemoteStorageWithDelete(newLogConfig.values(), wasRemoteLogEnabled, isRemoteLogStorageEnabled) + LogConfig.validateRetentionConfigsWhenRemoteCopyDisabled(newLogConfig.values(), isRemoteLogStorageEnabled) if (logs.nonEmpty) { logs.foreach { log => val oldLogConfig = log.updateConfig(newLogConfig) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index e6e7e469554..d251f9f01f7 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1459,6 +1459,13 @@ class UnifiedLog(@volatile var logStartOffset: Long, } } + /** + * @return true if this topic enables tiered storage and remote log copy is enabled (i.e. remote.log.copy.disable=false) + */ + private def remoteLogEnabledAndRemoteCopyEnabled(): Boolean = { + remoteLogEnabled() && !config.remoteLogCopyDisable() + } + /** * Find segments starting from the oldest until the user-supplied predicate is false. * A final segment that is empty will never be returned. @@ -1473,7 +1480,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, // Segments are eligible for deletion when: // 1. they are uploaded to the remote storage // 2. log-start-offset was incremented higher than the largest offset in the candidate segment - if (remoteLogEnabled()) { + // Note: when remote log copy is disabled, we will fall back to local log check using retention.ms/bytes + if (remoteLogEnabledAndRemoteCopyEnabled()) { (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage()) || allowDeletionDueToLogStartOffsetIncremented } else { @@ -1518,7 +1526,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private def incrementStartOffset(startOffset: Long, reason: LogStartOffsetIncrementReason): Unit = { - if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(startOffset, reason) + if (remoteLogEnabledAndRemoteCopyEnabled()) maybeIncrementLocalLogStartOffset(startOffset, reason) else maybeIncrementLogStartOffset(startOffset, reason) } @@ -1566,7 +1574,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private def deleteRetentionMsBreachedSegments(): Int = { - val retentionMs = localRetentionMs(config, remoteLogEnabled()) + val retentionMs = localRetentionMs(config, remoteLogEnabledAndRemoteCopyEnabled()) if (retentionMs < 0) return 0 val startMs = time.milliseconds @@ -1578,7 +1586,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private def deleteRetentionSizeBreachedSegments(): Int = { - val retentionSize: Long = localRetentionSize(config, remoteLogEnabled()) + val retentionSize: Long = localRetentionSize(config, remoteLogEnabledAndRemoteCopyEnabled()) if (retentionSize < 0 || size < retentionSize) return 0 var diff = size - retentionSize def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { @@ -2334,12 +2342,12 @@ object UnifiedLog extends Logging { } } - private[log] def localRetentionMs(config: LogConfig, remoteLogEnabled: Boolean): Long = { - if (remoteLogEnabled) config.remoteLogConfig.localRetentionMs else config.retentionMs + private[log] def localRetentionMs(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = { + if (remoteLogEnabledAndRemoteCopyEnabled) config.remoteLogConfig.localRetentionMs else config.retentionMs } - private[log] def localRetentionSize(config: LogConfig, remoteLogEnabled: Boolean): Long = { - if (remoteLogEnabled) config.remoteLogConfig.localRetentionBytes else config.retentionSize + private[log] def localRetentionSize(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = { + if (remoteLogEnabledAndRemoteCopyEnabled) config.remoteLogConfig.localRetentionBytes else config.retentionSize } } diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala index c88962cb845..2983ba0e307 100644 --- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala +++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala @@ -168,6 +168,138 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { verifyRemoteLogTopicConfigs(topicConfig) } + // `remote.log.delete.on.disable` only works in KRaft mode. + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testCreateTopicRetentionMsValidationWithRemoteCopyDisabled(quorum: String): Unit = { + val testTopicName2 = testTopicName + "2" + val testTopicName3 = testTopicName + "3" + val errorMsgMs = "When `remote.log.copy.disable` is set to true, the `local.retention.ms` and `retention.ms` " + + "must be set to the identical value because there will be no more logs copied to the remote storage." + + // 1. create a topic with `remote.log.copy.disable=true` and have different local.retention.ms and retention.ms value, + // it should fail to create the topic + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true") + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100") + topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1000") + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2") + + val admin = createAdminClient() + val err = assertThrowsException(classOf[InvalidConfigurationException], + () => TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, + numReplicationFactor, topicConfig = topicConfig)) + assertEquals(errorMsgMs, err.getMessage) + + // 2. change the local.retention.ms value to the same value as retention.ms should successfully create the topic + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1000") + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + // 3. change the local.retention.ms value to "-2" should also successfully create the topic + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2") + TestUtils.createTopicWithAdmin(admin, testTopicName2, brokers, controllerServers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + // 4. create a topic with `remote.log.copy.disable=false` and have different local.retention.ms and retention.ms value, + // it should successfully creates the topic. + topicConfig.clear() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100") + topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1000") + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2") + TestUtils.createTopicWithAdmin(admin, testTopicName3, brokers, controllerServers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + // 5. alter the config to `remote.log.copy.disable=true`, it should fail the config change + val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"), + AlterConfigOp.OpType.SET), + )) + val err2 = assertThrowsException(classOf[InvalidConfigurationException], + () => admin.incrementalAlterConfigs(configs).all().get()) + assertEquals(errorMsgMs, err2.getMessage) + + // 6. alter the config to `remote.log.copy.disable=true` and local.retention.ms == retention.ms, it should work without error + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"), + AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1000"), + AlterConfigOp.OpType.SET), + )) + + admin.incrementalAlterConfigs(configs).all().get() + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testCreateTopicRetentionBytesValidationWithRemoteCopyDisabled(quorum: String): Unit = { + val testTopicName2 = testTopicName + "2" + val testTopicName3 = testTopicName + "3" + val errorMsgBytes = "When `remote.log.copy.disable` is set to true, the `local.retention.bytes` and `retention.bytes` " + + "must be set to the identical value because there will be no more logs copied to the remote storage." + + // 1. create a topic with `remote.log.copy.disable=true` and have different local.retention.bytes and retention.bytes value, + // it should fail to create the topic + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true") + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100") + topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000") + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2") + + val admin = createAdminClient() + val err = assertThrowsException(classOf[InvalidConfigurationException], + () => TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, + numReplicationFactor, topicConfig = topicConfig)) + assertEquals(errorMsgBytes, err.getMessage) + + // 2. change the local.retention.bytes value to the same value as retention.bytes should successfully create the topic + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1000") + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + // 3. change the local.retention.bytes value to "-2" should also successfully create the topic + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2") + TestUtils.createTopicWithAdmin(admin, testTopicName2, brokers, controllerServers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + // 4. create a topic with `remote.log.copy.disable=false` and have different local.retention.bytes and retention.bytes value, + // it should successfully creates the topic. + topicConfig.clear() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100") + topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000") + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2") + TestUtils.createTopicWithAdmin(admin, testTopicName3, brokers, controllerServers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + // 5. alter the config to `remote.log.copy.disable=true`, it should fail the config change + val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"), + AlterConfigOp.OpType.SET), + )) + val err2 = assertThrowsException(classOf[InvalidConfigurationException], + () => admin.incrementalAlterConfigs(configs).all().get()) + assertEquals(errorMsgBytes, err2.getMessage) + + // 6. alter the config to `remote.log.copy.disable=true` and local.retention.bytes == retention.bytes, it should work without error + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"), + AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1000"), + AlterConfigOp.OpType.SET), + )) + admin.incrementalAlterConfigs(configs).all().get() + } + @ParameterizedTest @ValueSource(strings = Array("zk", "kraft")) def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = { diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 706a5ec30f2..264dfb36cfe 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -36,6 +36,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile +import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG} import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, FetchDataInfo, FetchIsolation, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex} import scala.jdk.CollectionConverters._ @@ -68,7 +69,9 @@ object LogTestUtils { indexIntervalBytes: Int = ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT, segmentIndexBytes: Int = ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT, fileDeleteDelayMs: Long = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, - remoteLogStorageEnable: Boolean = LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE): LogConfig = { + remoteLogStorageEnable: Boolean = LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE, + remoteLogCopyDisable: Boolean = DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, + remoteLogDeleteOnDisable: Boolean = DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG): LogConfig = { val logProps = new Properties() logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long) logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes: Integer) @@ -83,6 +86,8 @@ object LogTestUtils { logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, segmentIndexBytes: Integer) logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, fileDeleteDelayMs: java.lang.Long) logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, remoteLogStorageEnable: java.lang.Boolean) + logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, remoteLogCopyDisable: java.lang.Boolean) + logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, remoteLogDeleteOnDisable: java.lang.Boolean) new LogConfig(logProps) } diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index bea15ad4883..232c304be6a 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -4188,6 +4188,88 @@ class UnifiedLogTest { assertEquals(1, log.logSegments.size) } + @Test + def testRetentionOnLocalLogDeletionWhenRemoteLogCopyDisabled(): Unit = { + def createRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes))) + val segmentBytes = createRecords.sizeInBytes() + val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5, + fileDeleteDelayMs = 0, remoteLogStorageEnable = true) + val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) + + // Given 6 segments of 1 message each + for (_ <- 0 until 6) { + log.appendAsLeader(createRecords, leaderEpoch = 0) + } + assertEquals(6, log.logSegments.size) + + log.updateHighWatermark(log.logEndOffset) + + // Should not delete local log because highest remote storage offset is -1 (default value) + log.deleteOldSegments() + assertEquals(6, log.logSegments.size()) + assertEquals(0, log.logStartOffset) + assertEquals(0, log.localLogStartOffset()) + + // simulate calls to upload 2 segments to remote storage + log.updateHighestOffsetInRemoteStorage(1) + + log.deleteOldSegments() + assertEquals(4, log.logSegments.size()) + assertEquals(0, log.logStartOffset) + assertEquals(2, log.localLogStartOffset()) + + // add remoteCopyDisabled = true + val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5, + fileDeleteDelayMs = 0, remoteLogStorageEnable = true, remoteLogCopyDisable = true) + log.updateConfig(copyDisabledLogConfig) + + // No local logs will be deleted even though local retention bytes is 1 because we'll adopt retention.ms/bytes + // when remote.log.copy.disable = true + log.deleteOldSegments() + assertEquals(4, log.logSegments.size()) + assertEquals(0, log.logStartOffset) + assertEquals(2, log.localLogStartOffset()) + + // simulate the remote logs are all deleted due to retention policy + log.updateLogStartOffsetFromRemoteTier(2) + assertEquals(4, log.logSegments.size()) + assertEquals(2, log.logStartOffset) + assertEquals(2, log.localLogStartOffset()) + + // produce 3 more segments + for (_ <- 0 until 3) { + log.appendAsLeader(createRecords, leaderEpoch = 0) + } + assertEquals(7, log.logSegments.size) + log.updateHighWatermark(log.logEndOffset) + + // try to delete local logs again, 2 segments will be deleted this time because we'll adopt retention.ms/bytes (retention.bytes = 5) + // when remote.log.copy.disable = true + log.deleteOldSegments() + assertEquals(5, log.logSegments.size()) + assertEquals(4, log.logStartOffset) + assertEquals(4, log.localLogStartOffset()) + + // add localRetentionMs = 1, retentionMs = 1000 + val retentionMsConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, localRetentionMs = 1, retentionMs = 1000, + fileDeleteDelayMs = 0, remoteLogStorageEnable = true, remoteLogCopyDisable = true) + log.updateConfig(retentionMsConfig) + + // Should not delete any logs because no local logs expired using retention.ms = 1000 + mockTime.sleep(10) + log.deleteOldSegments() + assertEquals(5, log.logSegments.size()) + assertEquals(4, log.logStartOffset) + assertEquals(4, log.localLogStartOffset()) + + // Should delete all logs because all of them are expired based on retentionMs = 1000 + mockTime.sleep(1000) + log.deleteOldSegments() + assertEquals(1, log.logSegments.size()) + assertEquals(9, log.logStartOffset) + assertEquals(9, log.localLogStartOffset()) + } + @Test def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = { val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 267b4f15eb0..122c698bd1d 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -184,6 +184,8 @@ public class LogConfig extends AbstractConfig { public static final long DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS = ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT; public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false; + public static final boolean DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG = false; + public static final boolean DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = false; public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It indicates the value to be derived from RetentionBytes public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates the value to be derived from RetentionMs @@ -635,6 +637,7 @@ public class LogConfig extends AbstractConfig { validateNoRemoteStorageForCompactedTopic(newConfigs); validateRemoteStorageRetentionSize(newConfigs); validateRemoteStorageRetentionTime(newConfigs); + validateRetentionConfigsWhenRemoteCopyDisabled(newConfigs, isRemoteLogStorageEnabled); } else { // The new config "remote.storage.enable" is false, validate if it's turning from true to false boolean wasRemoteLogEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); @@ -651,6 +654,24 @@ public class LogConfig extends AbstractConfig { } } + public static void validateRetentionConfigsWhenRemoteCopyDisabled(Map newConfigs, boolean isRemoteLogStorageEnabled) { + boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false); + long retentionMs = (Long) newConfigs.get(TopicConfig.RETENTION_MS_CONFIG); + long localRetentionMs = (Long) newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG); + long retentionBytes = (Long) newConfigs.get(TopicConfig.RETENTION_BYTES_CONFIG); + long localRetentionBytes = (Long) newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); + if (isRemoteLogStorageEnabled && isRemoteLogCopyDisabled) { + if (localRetentionBytes != -2 && localRetentionBytes != retentionBytes) { + throw new InvalidConfigurationException("When `remote.log.copy.disable` is set to true, the `local.retention.bytes` " + + "and `retention.bytes` must be set to the identical value because there will be no more logs copied to the remote storage."); + } + if (localRetentionMs != -2 && localRetentionMs != retentionMs) { + throw new InvalidConfigurationException("When `remote.log.copy.disable` is set to true, the `local.retention.ms` " + + "and `retention.ms` must be set to the identical value because there will be no more logs copied to the remote storage."); + } + } + } + public static void validateNoInvalidRemoteStorageConfigsInZK(Map newConfigs) { boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false); boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false); diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java index 6bcaea6a363..2655f600b3b 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java @@ -60,8 +60,16 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness final Map> assignment = mkMap( mkEntry(p0, Arrays.asList(broker0, broker1)) ); - final Map disableCopy = new HashMap<>(); - disableCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"); + // local.retention.ms/bytes need to set to the same value as retention.ms/bytes when disabling remote log copy + final Map disableRemoteCopy = new HashMap<>(); + disableRemoteCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"); + disableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2"); + disableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2"); + + // revert the change to local.retention.bytes + final Map enableRemoteCopy = new HashMap<>(); + enableRemoteCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false"); + enableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1"); final Map deleteOnDisable = new HashMap<>(); deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"); @@ -78,7 +86,7 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness new KeyValueSpec("k2", "v2")) // disable remote log copy .updateTopicConfig(topicA, - Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"), + disableRemoteCopy, Collections.emptyList()) // make sure we can still consume from the beginning of the topic to read data from local and remote storage @@ -87,16 +95,16 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness // re-enable remote log copy .updateTopicConfig(topicA, - Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false"), + enableRemoteCopy, Collections.emptyList()) // make sure the logs can be offloaded .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L) .produce(topicA, p0, new KeyValueSpec("k3", "v3")) - // explicitly disable remote log copy + // disable remote log copy again .updateTopicConfig(topicA, - disableCopy, + disableRemoteCopy, Collections.emptyList()) // make sure we can still consume from the beginning of the topic to read data from local and remote storage .expectFetchFromTieredStorage(broker0, topicA, p0, 3)