KAFKA-17236: Handle local log deletion when remote.log.copy.disabled=true (#16765)

Handle local log deletion when remote.log.copy.disabled=true based on the KIP-950.

When tiered storage is disabled or becomes read-only on a topic, the local retention configuration becomes irrelevant, and all data expiration follows the topic-wide retention configuration exclusively.

- added remoteLogEnabledAndRemoteCopyEnabled method to check if this topic enables tiered storage and remote log copy is enabled. We should adopt local.retention.ms/bytes when remote.storage.enable=true,remote.log.copy.disable=false.
- Changed to use retention.bytes/retention.ms when remote copy disabled.
- Added validation to ask users to set local.retention.ms == retention.ms and local.retention.bytes == retention.bytes
- Added tests

Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
Luke Chen 2024-08-08 20:37:40 +09:00
parent dd5e7a8291
commit 7fe3cec4eb
8 changed files with 275 additions and 16 deletions

View File

@ -95,7 +95,9 @@ public class TopicConfig {
public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = "remote.log.copy.disable";
public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines whether tiered data for a topic should become read only," +
" and no more data uploading on a topic.";
" and no more data uploading on a topic. Once this config is set to true, the local retention configuration " +
"(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" +
"(i.e. retention.ms/bytes).";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +

View File

@ -981,6 +981,7 @@ class LogManager(logDirs: Seq[File],
LogConfig.validateNoInvalidRemoteStorageConfigsInZK(newLogConfig.values())
}
LogConfig.validateTurningOffRemoteStorageWithDelete(newLogConfig.values(), wasRemoteLogEnabled, isRemoteLogStorageEnabled)
LogConfig.validateRetentionConfigsWhenRemoteCopyDisabled(newLogConfig.values(), isRemoteLogStorageEnabled)
if (logs.nonEmpty) {
logs.foreach { log =>
val oldLogConfig = log.updateConfig(newLogConfig)

View File

@ -1463,6 +1463,13 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
}
/**
* @return true if this topic enables tiered storage and remote log copy is enabled (i.e. remote.log.copy.disable=false)
*/
private def remoteLogEnabledAndRemoteCopyEnabled(): Boolean = {
remoteLogEnabled() && !config.remoteLogCopyDisable()
}
/**
* Find segments starting from the oldest until the user-supplied predicate is false.
* A final segment that is empty will never be returned.
@ -1477,7 +1484,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// Segments are eligible for deletion when:
// 1. they are uploaded to the remote storage
// 2. log-start-offset was incremented higher than the largest offset in the candidate segment
if (remoteLogEnabled()) {
// Note: when remote log copy is disabled, we will fall back to local log check using retention.ms/bytes
if (remoteLogEnabledAndRemoteCopyEnabled()) {
(upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage()) ||
allowDeletionDueToLogStartOffsetIncremented
} else {
@ -1522,7 +1530,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def incrementStartOffset(startOffset: Long, reason: LogStartOffsetIncrementReason): Unit = {
if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(startOffset, reason)
if (remoteLogEnabledAndRemoteCopyEnabled()) maybeIncrementLocalLogStartOffset(startOffset, reason)
else maybeIncrementLogStartOffset(startOffset, reason)
}
@ -1570,7 +1578,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def deleteRetentionMsBreachedSegments(): Int = {
val retentionMs = localRetentionMs(config, remoteLogEnabled())
val retentionMs = localRetentionMs(config, remoteLogEnabledAndRemoteCopyEnabled())
if (retentionMs < 0) return 0
val startMs = time.milliseconds
@ -1582,7 +1590,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def deleteRetentionSizeBreachedSegments(): Int = {
val retentionSize: Long = localRetentionSize(config, remoteLogEnabled())
val retentionSize: Long = localRetentionSize(config, remoteLogEnabledAndRemoteCopyEnabled())
if (retentionSize < 0 || size < retentionSize) return 0
var diff = size - retentionSize
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
@ -2338,12 +2346,12 @@ object UnifiedLog extends Logging {
}
}
private[log] def localRetentionMs(config: LogConfig, remoteLogEnabled: Boolean): Long = {
if (remoteLogEnabled) config.remoteLogConfig.localRetentionMs else config.retentionMs
private[log] def localRetentionMs(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
if (remoteLogEnabledAndRemoteCopyEnabled) config.remoteLogConfig.localRetentionMs else config.retentionMs
}
private[log] def localRetentionSize(config: LogConfig, remoteLogEnabled: Boolean): Long = {
if (remoteLogEnabled) config.remoteLogConfig.localRetentionBytes else config.retentionSize
private[log] def localRetentionSize(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
if (remoteLogEnabledAndRemoteCopyEnabled) config.remoteLogConfig.localRetentionBytes else config.retentionSize
}
}

View File

@ -168,6 +168,138 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
verifyRemoteLogTopicConfigs(topicConfig)
}
// `remote.log.delete.on.disable` only works in KRaft mode.
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateTopicRetentionMsValidationWithRemoteCopyDisabled(quorum: String): Unit = {
val testTopicName2 = testTopicName + "2"
val testTopicName3 = testTopicName + "3"
val errorMsgMs = "When `remote.log.copy.disable` is set to true, the `local.retention.ms` and `retention.ms` " +
"must be set to the identical value because there will be no more logs copied to the remote storage."
// 1. create a topic with `remote.log.copy.disable=true` and have different local.retention.ms and retention.ms value,
// it should fail to create the topic
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
val admin = createAdminClient()
val err = assertThrowsException(classOf[InvalidConfigurationException],
() => TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions,
numReplicationFactor, topicConfig = topicConfig))
assertEquals(errorMsgMs, err.getMessage)
// 2. change the local.retention.ms value to the same value as retention.ms should successfully create the topic
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1000")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// 3. change the local.retention.ms value to "-2" should also successfully create the topic
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
TestUtils.createTopicWithAdmin(admin, testTopicName2, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// 4. create a topic with `remote.log.copy.disable=false` and have different local.retention.ms and retention.ms value,
// it should successfully creates the topic.
topicConfig.clear()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
TestUtils.createTopicWithAdmin(admin, testTopicName3, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// 5. alter the config to `remote.log.copy.disable=true`, it should fail the config change
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
AlterConfigOp.OpType.SET),
))
val err2 = assertThrowsException(classOf[InvalidConfigurationException],
() => admin.incrementalAlterConfigs(configs).all().get())
assertEquals(errorMsgMs, err2.getMessage)
// 6. alter the config to `remote.log.copy.disable=true` and local.retention.ms == retention.ms, it should work without error
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1000"),
AlterConfigOp.OpType.SET),
))
admin.incrementalAlterConfigs(configs).all().get()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCreateTopicRetentionBytesValidationWithRemoteCopyDisabled(quorum: String): Unit = {
val testTopicName2 = testTopicName + "2"
val testTopicName3 = testTopicName + "3"
val errorMsgBytes = "When `remote.log.copy.disable` is set to true, the `local.retention.bytes` and `retention.bytes` " +
"must be set to the identical value because there will be no more logs copied to the remote storage."
// 1. create a topic with `remote.log.copy.disable=true` and have different local.retention.bytes and retention.bytes value,
// it should fail to create the topic
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
val admin = createAdminClient()
val err = assertThrowsException(classOf[InvalidConfigurationException],
() => TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions,
numReplicationFactor, topicConfig = topicConfig))
assertEquals(errorMsgBytes, err.getMessage)
// 2. change the local.retention.bytes value to the same value as retention.bytes should successfully create the topic
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1000")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// 3. change the local.retention.bytes value to "-2" should also successfully create the topic
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
TestUtils.createTopicWithAdmin(admin, testTopicName2, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// 4. create a topic with `remote.log.copy.disable=false` and have different local.retention.bytes and retention.bytes value,
// it should successfully creates the topic.
topicConfig.clear()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
TestUtils.createTopicWithAdmin(admin, testTopicName3, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// 5. alter the config to `remote.log.copy.disable=true`, it should fail the config change
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
AlterConfigOp.OpType.SET),
))
val err2 = assertThrowsException(classOf[InvalidConfigurationException],
() => admin.incrementalAlterConfigs(configs).all().get())
assertEquals(errorMsgBytes, err2.getMessage)
// 6. alter the config to `remote.log.copy.disable=true` and local.retention.bytes == retention.bytes, it should work without error
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1000"),
AlterConfigOp.OpType.SET),
))
admin.incrementalAlterConfigs(configs).all().get()
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {

View File

@ -36,6 +36,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, FetchDataInfo, FetchIsolation, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex}
import scala.jdk.CollectionConverters._
@ -68,7 +69,9 @@ object LogTestUtils {
indexIntervalBytes: Int = ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT,
segmentIndexBytes: Int = ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT,
fileDeleteDelayMs: Long = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
remoteLogStorageEnable: Boolean = LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE): LogConfig = {
remoteLogStorageEnable: Boolean = LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE,
remoteLogCopyDisable: Boolean = DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG,
remoteLogDeleteOnDisable: Boolean = DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG): LogConfig = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long)
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes: Integer)
@ -83,6 +86,8 @@ object LogTestUtils {
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, segmentIndexBytes: Integer)
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, fileDeleteDelayMs: java.lang.Long)
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, remoteLogStorageEnable: java.lang.Boolean)
logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, remoteLogCopyDisable: java.lang.Boolean)
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, remoteLogDeleteOnDisable: java.lang.Boolean)
new LogConfig(logProps)
}

View File

@ -4187,6 +4187,88 @@ class UnifiedLogTest {
assertEquals(1, log.logSegments.size)
}
@Test
def testRetentionOnLocalLogDeletionWhenRemoteLogCopyDisabled(): Unit = {
def createRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
val segmentBytes = createRecords.sizeInBytes()
val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
// Given 6 segments of 1 message each
for (_ <- 0 until 6) {
log.appendAsLeader(createRecords, leaderEpoch = 0)
}
assertEquals(6, log.logSegments.size)
log.updateHighWatermark(log.logEndOffset)
// Should not delete local log because highest remote storage offset is -1 (default value)
log.deleteOldSegments()
assertEquals(6, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(0, log.localLogStartOffset())
// simulate calls to upload 2 segments to remote storage
log.updateHighestOffsetInRemoteStorage(1)
log.deleteOldSegments()
assertEquals(4, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(2, log.localLogStartOffset())
// add remoteCopyDisabled = true
val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true, remoteLogCopyDisable = true)
log.updateConfig(copyDisabledLogConfig)
// No local logs will be deleted even though local retention bytes is 1 because we'll adopt retention.ms/bytes
// when remote.log.copy.disable = true
log.deleteOldSegments()
assertEquals(4, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(2, log.localLogStartOffset())
// simulate the remote logs are all deleted due to retention policy
log.updateLogStartOffsetFromRemoteTier(2)
assertEquals(4, log.logSegments.size())
assertEquals(2, log.logStartOffset)
assertEquals(2, log.localLogStartOffset())
// produce 3 more segments
for (_ <- 0 until 3) {
log.appendAsLeader(createRecords, leaderEpoch = 0)
}
assertEquals(7, log.logSegments.size)
log.updateHighWatermark(log.logEndOffset)
// try to delete local logs again, 2 segments will be deleted this time because we'll adopt retention.ms/bytes (retention.bytes = 5)
// when remote.log.copy.disable = true
log.deleteOldSegments()
assertEquals(5, log.logSegments.size())
assertEquals(4, log.logStartOffset)
assertEquals(4, log.localLogStartOffset())
// add localRetentionMs = 1, retentionMs = 1000
val retentionMsConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, localRetentionMs = 1, retentionMs = 1000,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true, remoteLogCopyDisable = true)
log.updateConfig(retentionMsConfig)
// Should not delete any logs because no local logs expired using retention.ms = 1000
mockTime.sleep(10)
log.deleteOldSegments()
assertEquals(5, log.logSegments.size())
assertEquals(4, log.logStartOffset)
assertEquals(4, log.localLogStartOffset())
// Should delete all logs because all of them are expired based on retentionMs = 1000
mockTime.sleep(1000)
log.deleteOldSegments()
assertEquals(1, log.logSegments.size())
assertEquals(9, log.logStartOffset)
assertEquals(9, log.localLogStartOffset())
}
@Test
def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true)

View File

@ -184,6 +184,8 @@ public class LogConfig extends AbstractConfig {
public static final long DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS = ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT;
public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false;
public static final boolean DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG = false;
public static final boolean DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = false;
public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It indicates the value to be derived from RetentionBytes
public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates the value to be derived from RetentionMs
@ -635,6 +637,7 @@ public class LogConfig extends AbstractConfig {
validateNoRemoteStorageForCompactedTopic(newConfigs);
validateRemoteStorageRetentionSize(newConfigs);
validateRemoteStorageRetentionTime(newConfigs);
validateRetentionConfigsWhenRemoteCopyDisabled(newConfigs, isRemoteLogStorageEnabled);
} else {
// The new config "remote.storage.enable" is false, validate if it's turning from true to false
boolean wasRemoteLogEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"));
@ -651,6 +654,24 @@ public class LogConfig extends AbstractConfig {
}
}
public static void validateRetentionConfigsWhenRemoteCopyDisabled(Map<?, ?> newConfigs, boolean isRemoteLogStorageEnabled) {
boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false);
long retentionMs = (Long) newConfigs.get(TopicConfig.RETENTION_MS_CONFIG);
long localRetentionMs = (Long) newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
long retentionBytes = (Long) newConfigs.get(TopicConfig.RETENTION_BYTES_CONFIG);
long localRetentionBytes = (Long) newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
if (isRemoteLogStorageEnabled && isRemoteLogCopyDisabled) {
if (localRetentionBytes != -2 && localRetentionBytes != retentionBytes) {
throw new InvalidConfigurationException("When `remote.log.copy.disable` is set to true, the `local.retention.bytes` " +
"and `retention.bytes` must be set to the identical value because there will be no more logs copied to the remote storage.");
}
if (localRetentionMs != -2 && localRetentionMs != retentionMs) {
throw new InvalidConfigurationException("When `remote.log.copy.disable` is set to true, the `local.retention.ms` " +
"and `retention.ms` must be set to the identical value because there will be no more logs copied to the remote storage.");
}
}
}
public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> newConfigs) {
boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false);
boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false);

View File

@ -60,8 +60,16 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness
final Map<Integer, List<Integer>> assignment = mkMap(
mkEntry(p0, Arrays.asList(broker0, broker1))
);
final Map<String, String> disableCopy = new HashMap<>();
disableCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true");
// local.retention.ms/bytes need to set to the same value as retention.ms/bytes when disabling remote log copy
final Map<String, String> disableRemoteCopy = new HashMap<>();
disableRemoteCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true");
disableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2");
disableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2");
// revert the change to local.retention.bytes
final Map<String, String> enableRemoteCopy = new HashMap<>();
enableRemoteCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false");
enableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
final Map<String, String> deleteOnDisable = new HashMap<>();
deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false");
@ -78,7 +86,7 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness
new KeyValueSpec("k2", "v2"))
// disable remote log copy
.updateTopicConfig(topicA,
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
disableRemoteCopy,
Collections.emptyList())
// make sure we can still consume from the beginning of the topic to read data from local and remote storage
@ -87,16 +95,16 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness
// re-enable remote log copy
.updateTopicConfig(topicA,
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false"),
enableRemoteCopy,
Collections.emptyList())
// make sure the logs can be offloaded
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
.produce(topicA, p0, new KeyValueSpec("k3", "v3"))
// explicitly disable remote log copy
// disable remote log copy again
.updateTopicConfig(topicA,
disableCopy,
disableRemoteCopy,
Collections.emptyList())
// make sure we can still consume from the beginning of the topic to read data from local and remote storage
.expectFetchFromTieredStorage(broker0, topicA, p0, 3)