mirror of https://github.com/apache/kafka.git
KAFKA-17236: Handle local log deletion when remote.log.copy.disabled=true (#16765)
Handle local log deletion when remote.log.copy.disabled=true based on the KIP-950. When tiered storage is disabled or becomes read-only on a topic, the local retention configuration becomes irrelevant, and all data expiration follows the topic-wide retention configuration exclusively. - added remoteLogEnabledAndRemoteCopyEnabled method to check if this topic enables tiered storage and remote log copy is enabled. We should adopt local.retention.ms/bytes when remote.storage.enable=true,remote.log.copy.disable=false. - Changed to use retention.bytes/retention.ms when remote copy disabled. - Added validation to ask users to set local.retention.ms == retention.ms and local.retention.bytes == retention.bytes - Added tests Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
parent
786d2c9975
commit
164f899605
|
@ -95,7 +95,9 @@ public class TopicConfig {
|
||||||
|
|
||||||
public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = "remote.log.copy.disable";
|
public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = "remote.log.copy.disable";
|
||||||
public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines whether tiered data for a topic should become read only," +
|
public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines whether tiered data for a topic should become read only," +
|
||||||
" and no more data uploading on a topic.";
|
" and no more data uploading on a topic. Once this config is set to true, the local retention configuration " +
|
||||||
|
"(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" +
|
||||||
|
"(i.e. retention.ms/bytes).";
|
||||||
|
|
||||||
public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
|
public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
|
||||||
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +
|
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +
|
||||||
|
|
|
@ -981,6 +981,7 @@ class LogManager(logDirs: Seq[File],
|
||||||
LogConfig.validateNoInvalidRemoteStorageConfigsInZK(newLogConfig.values())
|
LogConfig.validateNoInvalidRemoteStorageConfigsInZK(newLogConfig.values())
|
||||||
}
|
}
|
||||||
LogConfig.validateTurningOffRemoteStorageWithDelete(newLogConfig.values(), wasRemoteLogEnabled, isRemoteLogStorageEnabled)
|
LogConfig.validateTurningOffRemoteStorageWithDelete(newLogConfig.values(), wasRemoteLogEnabled, isRemoteLogStorageEnabled)
|
||||||
|
LogConfig.validateRetentionConfigsWhenRemoteCopyDisabled(newLogConfig.values(), isRemoteLogStorageEnabled)
|
||||||
if (logs.nonEmpty) {
|
if (logs.nonEmpty) {
|
||||||
logs.foreach { log =>
|
logs.foreach { log =>
|
||||||
val oldLogConfig = log.updateConfig(newLogConfig)
|
val oldLogConfig = log.updateConfig(newLogConfig)
|
||||||
|
|
|
@ -1459,6 +1459,13 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if this topic enables tiered storage and remote log copy is enabled (i.e. remote.log.copy.disable=false)
|
||||||
|
*/
|
||||||
|
private def remoteLogEnabledAndRemoteCopyEnabled(): Boolean = {
|
||||||
|
remoteLogEnabled() && !config.remoteLogCopyDisable()
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find segments starting from the oldest until the user-supplied predicate is false.
|
* Find segments starting from the oldest until the user-supplied predicate is false.
|
||||||
* A final segment that is empty will never be returned.
|
* A final segment that is empty will never be returned.
|
||||||
|
@ -1473,7 +1480,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
||||||
// Segments are eligible for deletion when:
|
// Segments are eligible for deletion when:
|
||||||
// 1. they are uploaded to the remote storage
|
// 1. they are uploaded to the remote storage
|
||||||
// 2. log-start-offset was incremented higher than the largest offset in the candidate segment
|
// 2. log-start-offset was incremented higher than the largest offset in the candidate segment
|
||||||
if (remoteLogEnabled()) {
|
// Note: when remote log copy is disabled, we will fall back to local log check using retention.ms/bytes
|
||||||
|
if (remoteLogEnabledAndRemoteCopyEnabled()) {
|
||||||
(upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage()) ||
|
(upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage()) ||
|
||||||
allowDeletionDueToLogStartOffsetIncremented
|
allowDeletionDueToLogStartOffsetIncremented
|
||||||
} else {
|
} else {
|
||||||
|
@ -1518,7 +1526,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
||||||
}
|
}
|
||||||
|
|
||||||
private def incrementStartOffset(startOffset: Long, reason: LogStartOffsetIncrementReason): Unit = {
|
private def incrementStartOffset(startOffset: Long, reason: LogStartOffsetIncrementReason): Unit = {
|
||||||
if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(startOffset, reason)
|
if (remoteLogEnabledAndRemoteCopyEnabled()) maybeIncrementLocalLogStartOffset(startOffset, reason)
|
||||||
else maybeIncrementLogStartOffset(startOffset, reason)
|
else maybeIncrementLogStartOffset(startOffset, reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1566,7 +1574,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
||||||
}
|
}
|
||||||
|
|
||||||
private def deleteRetentionMsBreachedSegments(): Int = {
|
private def deleteRetentionMsBreachedSegments(): Int = {
|
||||||
val retentionMs = localRetentionMs(config, remoteLogEnabled())
|
val retentionMs = localRetentionMs(config, remoteLogEnabledAndRemoteCopyEnabled())
|
||||||
if (retentionMs < 0) return 0
|
if (retentionMs < 0) return 0
|
||||||
val startMs = time.milliseconds
|
val startMs = time.milliseconds
|
||||||
|
|
||||||
|
@ -1578,7 +1586,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
||||||
}
|
}
|
||||||
|
|
||||||
private def deleteRetentionSizeBreachedSegments(): Int = {
|
private def deleteRetentionSizeBreachedSegments(): Int = {
|
||||||
val retentionSize: Long = localRetentionSize(config, remoteLogEnabled())
|
val retentionSize: Long = localRetentionSize(config, remoteLogEnabledAndRemoteCopyEnabled())
|
||||||
if (retentionSize < 0 || size < retentionSize) return 0
|
if (retentionSize < 0 || size < retentionSize) return 0
|
||||||
var diff = size - retentionSize
|
var diff = size - retentionSize
|
||||||
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
|
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
|
||||||
|
@ -2334,12 +2342,12 @@ object UnifiedLog extends Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[log] def localRetentionMs(config: LogConfig, remoteLogEnabled: Boolean): Long = {
|
private[log] def localRetentionMs(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
|
||||||
if (remoteLogEnabled) config.remoteLogConfig.localRetentionMs else config.retentionMs
|
if (remoteLogEnabledAndRemoteCopyEnabled) config.remoteLogConfig.localRetentionMs else config.retentionMs
|
||||||
}
|
}
|
||||||
|
|
||||||
private[log] def localRetentionSize(config: LogConfig, remoteLogEnabled: Boolean): Long = {
|
private[log] def localRetentionSize(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
|
||||||
if (remoteLogEnabled) config.remoteLogConfig.localRetentionBytes else config.retentionSize
|
if (remoteLogEnabledAndRemoteCopyEnabled) config.remoteLogConfig.localRetentionBytes else config.retentionSize
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,6 +168,138 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
|
||||||
verifyRemoteLogTopicConfigs(topicConfig)
|
verifyRemoteLogTopicConfigs(topicConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// `remote.log.delete.on.disable` only works in KRaft mode.
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(strings = Array("kraft"))
|
||||||
|
def testCreateTopicRetentionMsValidationWithRemoteCopyDisabled(quorum: String): Unit = {
|
||||||
|
val testTopicName2 = testTopicName + "2"
|
||||||
|
val testTopicName3 = testTopicName + "3"
|
||||||
|
val errorMsgMs = "When `remote.log.copy.disable` is set to true, the `local.retention.ms` and `retention.ms` " +
|
||||||
|
"must be set to the identical value because there will be no more logs copied to the remote storage."
|
||||||
|
|
||||||
|
// 1. create a topic with `remote.log.copy.disable=true` and have different local.retention.ms and retention.ms value,
|
||||||
|
// it should fail to create the topic
|
||||||
|
val topicConfig = new Properties()
|
||||||
|
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
|
||||||
|
topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true")
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
|
||||||
|
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
|
||||||
|
|
||||||
|
val admin = createAdminClient()
|
||||||
|
val err = assertThrowsException(classOf[InvalidConfigurationException],
|
||||||
|
() => TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions,
|
||||||
|
numReplicationFactor, topicConfig = topicConfig))
|
||||||
|
assertEquals(errorMsgMs, err.getMessage)
|
||||||
|
|
||||||
|
// 2. change the local.retention.ms value to the same value as retention.ms should successfully create the topic
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1000")
|
||||||
|
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
|
||||||
|
topicConfig = topicConfig)
|
||||||
|
|
||||||
|
// 3. change the local.retention.ms value to "-2" should also successfully create the topic
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
|
||||||
|
TestUtils.createTopicWithAdmin(admin, testTopicName2, brokers, controllerServers, numPartitions, numReplicationFactor,
|
||||||
|
topicConfig = topicConfig)
|
||||||
|
|
||||||
|
// 4. create a topic with `remote.log.copy.disable=false` and have different local.retention.ms and retention.ms value,
|
||||||
|
// it should successfully creates the topic.
|
||||||
|
topicConfig.clear()
|
||||||
|
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
|
||||||
|
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
|
||||||
|
TestUtils.createTopicWithAdmin(admin, testTopicName3, brokers, controllerServers, numPartitions, numReplicationFactor,
|
||||||
|
topicConfig = topicConfig)
|
||||||
|
|
||||||
|
// 5. alter the config to `remote.log.copy.disable=true`, it should fail the config change
|
||||||
|
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
|
||||||
|
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
|
||||||
|
util.Arrays.asList(
|
||||||
|
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
|
||||||
|
AlterConfigOp.OpType.SET),
|
||||||
|
))
|
||||||
|
val err2 = assertThrowsException(classOf[InvalidConfigurationException],
|
||||||
|
() => admin.incrementalAlterConfigs(configs).all().get())
|
||||||
|
assertEquals(errorMsgMs, err2.getMessage)
|
||||||
|
|
||||||
|
// 6. alter the config to `remote.log.copy.disable=true` and local.retention.ms == retention.ms, it should work without error
|
||||||
|
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
|
||||||
|
util.Arrays.asList(
|
||||||
|
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
|
||||||
|
AlterConfigOp.OpType.SET),
|
||||||
|
new AlterConfigOp(new ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1000"),
|
||||||
|
AlterConfigOp.OpType.SET),
|
||||||
|
))
|
||||||
|
|
||||||
|
admin.incrementalAlterConfigs(configs).all().get()
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(strings = Array("kraft"))
|
||||||
|
def testCreateTopicRetentionBytesValidationWithRemoteCopyDisabled(quorum: String): Unit = {
|
||||||
|
val testTopicName2 = testTopicName + "2"
|
||||||
|
val testTopicName3 = testTopicName + "3"
|
||||||
|
val errorMsgBytes = "When `remote.log.copy.disable` is set to true, the `local.retention.bytes` and `retention.bytes` " +
|
||||||
|
"must be set to the identical value because there will be no more logs copied to the remote storage."
|
||||||
|
|
||||||
|
// 1. create a topic with `remote.log.copy.disable=true` and have different local.retention.bytes and retention.bytes value,
|
||||||
|
// it should fail to create the topic
|
||||||
|
val topicConfig = new Properties()
|
||||||
|
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
|
||||||
|
topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true")
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100")
|
||||||
|
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
|
||||||
|
|
||||||
|
val admin = createAdminClient()
|
||||||
|
val err = assertThrowsException(classOf[InvalidConfigurationException],
|
||||||
|
() => TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions,
|
||||||
|
numReplicationFactor, topicConfig = topicConfig))
|
||||||
|
assertEquals(errorMsgBytes, err.getMessage)
|
||||||
|
|
||||||
|
// 2. change the local.retention.bytes value to the same value as retention.bytes should successfully create the topic
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1000")
|
||||||
|
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
|
||||||
|
topicConfig = topicConfig)
|
||||||
|
|
||||||
|
// 3. change the local.retention.bytes value to "-2" should also successfully create the topic
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
|
||||||
|
TestUtils.createTopicWithAdmin(admin, testTopicName2, brokers, controllerServers, numPartitions, numReplicationFactor,
|
||||||
|
topicConfig = topicConfig)
|
||||||
|
|
||||||
|
// 4. create a topic with `remote.log.copy.disable=false` and have different local.retention.bytes and retention.bytes value,
|
||||||
|
// it should successfully creates the topic.
|
||||||
|
topicConfig.clear()
|
||||||
|
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100")
|
||||||
|
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
|
||||||
|
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
|
||||||
|
TestUtils.createTopicWithAdmin(admin, testTopicName3, brokers, controllerServers, numPartitions, numReplicationFactor,
|
||||||
|
topicConfig = topicConfig)
|
||||||
|
|
||||||
|
// 5. alter the config to `remote.log.copy.disable=true`, it should fail the config change
|
||||||
|
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
|
||||||
|
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
|
||||||
|
util.Arrays.asList(
|
||||||
|
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
|
||||||
|
AlterConfigOp.OpType.SET),
|
||||||
|
))
|
||||||
|
val err2 = assertThrowsException(classOf[InvalidConfigurationException],
|
||||||
|
() => admin.incrementalAlterConfigs(configs).all().get())
|
||||||
|
assertEquals(errorMsgBytes, err2.getMessage)
|
||||||
|
|
||||||
|
// 6. alter the config to `remote.log.copy.disable=true` and local.retention.bytes == retention.bytes, it should work without error
|
||||||
|
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
|
||||||
|
util.Arrays.asList(
|
||||||
|
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
|
||||||
|
AlterConfigOp.OpType.SET),
|
||||||
|
new AlterConfigOp(new ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1000"),
|
||||||
|
AlterConfigOp.OpType.SET),
|
||||||
|
))
|
||||||
|
admin.incrementalAlterConfigs(configs).all().get()
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(strings = Array("zk", "kraft"))
|
@ValueSource(strings = Array("zk", "kraft"))
|
||||||
def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {
|
def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
|
||||||
import org.apache.kafka.server.config.ServerLogConfigs
|
import org.apache.kafka.server.config.ServerLogConfigs
|
||||||
import org.apache.kafka.server.util.Scheduler
|
import org.apache.kafka.server.util.Scheduler
|
||||||
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
|
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
|
||||||
|
import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
|
||||||
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, FetchDataInfo, FetchIsolation, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex}
|
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, FetchDataInfo, FetchIsolation, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex}
|
||||||
|
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
@ -68,7 +69,9 @@ object LogTestUtils {
|
||||||
indexIntervalBytes: Int = ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT,
|
indexIntervalBytes: Int = ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT,
|
||||||
segmentIndexBytes: Int = ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT,
|
segmentIndexBytes: Int = ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT,
|
||||||
fileDeleteDelayMs: Long = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
|
fileDeleteDelayMs: Long = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
|
||||||
remoteLogStorageEnable: Boolean = LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE): LogConfig = {
|
remoteLogStorageEnable: Boolean = LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE,
|
||||||
|
remoteLogCopyDisable: Boolean = DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG,
|
||||||
|
remoteLogDeleteOnDisable: Boolean = DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG): LogConfig = {
|
||||||
val logProps = new Properties()
|
val logProps = new Properties()
|
||||||
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long)
|
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long)
|
||||||
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes: Integer)
|
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes: Integer)
|
||||||
|
@ -83,6 +86,8 @@ object LogTestUtils {
|
||||||
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, segmentIndexBytes: Integer)
|
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, segmentIndexBytes: Integer)
|
||||||
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, fileDeleteDelayMs: java.lang.Long)
|
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, fileDeleteDelayMs: java.lang.Long)
|
||||||
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, remoteLogStorageEnable: java.lang.Boolean)
|
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, remoteLogStorageEnable: java.lang.Boolean)
|
||||||
|
logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, remoteLogCopyDisable: java.lang.Boolean)
|
||||||
|
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, remoteLogDeleteOnDisable: java.lang.Boolean)
|
||||||
new LogConfig(logProps)
|
new LogConfig(logProps)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4188,6 +4188,88 @@ class UnifiedLogTest {
|
||||||
assertEquals(1, log.logSegments.size)
|
assertEquals(1, log.logSegments.size)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testRetentionOnLocalLogDeletionWhenRemoteLogCopyDisabled(): Unit = {
|
||||||
|
def createRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
|
||||||
|
val segmentBytes = createRecords.sizeInBytes()
|
||||||
|
val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
|
||||||
|
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
|
||||||
|
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
|
||||||
|
|
||||||
|
// Given 6 segments of 1 message each
|
||||||
|
for (_ <- 0 until 6) {
|
||||||
|
log.appendAsLeader(createRecords, leaderEpoch = 0)
|
||||||
|
}
|
||||||
|
assertEquals(6, log.logSegments.size)
|
||||||
|
|
||||||
|
log.updateHighWatermark(log.logEndOffset)
|
||||||
|
|
||||||
|
// Should not delete local log because highest remote storage offset is -1 (default value)
|
||||||
|
log.deleteOldSegments()
|
||||||
|
assertEquals(6, log.logSegments.size())
|
||||||
|
assertEquals(0, log.logStartOffset)
|
||||||
|
assertEquals(0, log.localLogStartOffset())
|
||||||
|
|
||||||
|
// simulate calls to upload 2 segments to remote storage
|
||||||
|
log.updateHighestOffsetInRemoteStorage(1)
|
||||||
|
|
||||||
|
log.deleteOldSegments()
|
||||||
|
assertEquals(4, log.logSegments.size())
|
||||||
|
assertEquals(0, log.logStartOffset)
|
||||||
|
assertEquals(2, log.localLogStartOffset())
|
||||||
|
|
||||||
|
// add remoteCopyDisabled = true
|
||||||
|
val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
|
||||||
|
fileDeleteDelayMs = 0, remoteLogStorageEnable = true, remoteLogCopyDisable = true)
|
||||||
|
log.updateConfig(copyDisabledLogConfig)
|
||||||
|
|
||||||
|
// No local logs will be deleted even though local retention bytes is 1 because we'll adopt retention.ms/bytes
|
||||||
|
// when remote.log.copy.disable = true
|
||||||
|
log.deleteOldSegments()
|
||||||
|
assertEquals(4, log.logSegments.size())
|
||||||
|
assertEquals(0, log.logStartOffset)
|
||||||
|
assertEquals(2, log.localLogStartOffset())
|
||||||
|
|
||||||
|
// simulate the remote logs are all deleted due to retention policy
|
||||||
|
log.updateLogStartOffsetFromRemoteTier(2)
|
||||||
|
assertEquals(4, log.logSegments.size())
|
||||||
|
assertEquals(2, log.logStartOffset)
|
||||||
|
assertEquals(2, log.localLogStartOffset())
|
||||||
|
|
||||||
|
// produce 3 more segments
|
||||||
|
for (_ <- 0 until 3) {
|
||||||
|
log.appendAsLeader(createRecords, leaderEpoch = 0)
|
||||||
|
}
|
||||||
|
assertEquals(7, log.logSegments.size)
|
||||||
|
log.updateHighWatermark(log.logEndOffset)
|
||||||
|
|
||||||
|
// try to delete local logs again, 2 segments will be deleted this time because we'll adopt retention.ms/bytes (retention.bytes = 5)
|
||||||
|
// when remote.log.copy.disable = true
|
||||||
|
log.deleteOldSegments()
|
||||||
|
assertEquals(5, log.logSegments.size())
|
||||||
|
assertEquals(4, log.logStartOffset)
|
||||||
|
assertEquals(4, log.localLogStartOffset())
|
||||||
|
|
||||||
|
// add localRetentionMs = 1, retentionMs = 1000
|
||||||
|
val retentionMsConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, localRetentionMs = 1, retentionMs = 1000,
|
||||||
|
fileDeleteDelayMs = 0, remoteLogStorageEnable = true, remoteLogCopyDisable = true)
|
||||||
|
log.updateConfig(retentionMsConfig)
|
||||||
|
|
||||||
|
// Should not delete any logs because no local logs expired using retention.ms = 1000
|
||||||
|
mockTime.sleep(10)
|
||||||
|
log.deleteOldSegments()
|
||||||
|
assertEquals(5, log.logSegments.size())
|
||||||
|
assertEquals(4, log.logStartOffset)
|
||||||
|
assertEquals(4, log.localLogStartOffset())
|
||||||
|
|
||||||
|
// Should delete all logs because all of them are expired based on retentionMs = 1000
|
||||||
|
mockTime.sleep(1000)
|
||||||
|
log.deleteOldSegments()
|
||||||
|
assertEquals(1, log.logSegments.size())
|
||||||
|
assertEquals(9, log.logStartOffset)
|
||||||
|
assertEquals(9, log.localLogStartOffset())
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
|
def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
|
||||||
val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
|
val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
|
||||||
|
|
|
@ -184,6 +184,8 @@ public class LogConfig extends AbstractConfig {
|
||||||
public static final long DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS = ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT;
|
public static final long DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS = ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT;
|
||||||
|
|
||||||
public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false;
|
public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false;
|
||||||
|
public static final boolean DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG = false;
|
||||||
|
public static final boolean DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = false;
|
||||||
public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It indicates the value to be derived from RetentionBytes
|
public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It indicates the value to be derived from RetentionBytes
|
||||||
public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates the value to be derived from RetentionMs
|
public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates the value to be derived from RetentionMs
|
||||||
|
|
||||||
|
@ -635,6 +637,7 @@ public class LogConfig extends AbstractConfig {
|
||||||
validateNoRemoteStorageForCompactedTopic(newConfigs);
|
validateNoRemoteStorageForCompactedTopic(newConfigs);
|
||||||
validateRemoteStorageRetentionSize(newConfigs);
|
validateRemoteStorageRetentionSize(newConfigs);
|
||||||
validateRemoteStorageRetentionTime(newConfigs);
|
validateRemoteStorageRetentionTime(newConfigs);
|
||||||
|
validateRetentionConfigsWhenRemoteCopyDisabled(newConfigs, isRemoteLogStorageEnabled);
|
||||||
} else {
|
} else {
|
||||||
// The new config "remote.storage.enable" is false, validate if it's turning from true to false
|
// The new config "remote.storage.enable" is false, validate if it's turning from true to false
|
||||||
boolean wasRemoteLogEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"));
|
boolean wasRemoteLogEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"));
|
||||||
|
@ -651,6 +654,24 @@ public class LogConfig extends AbstractConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void validateRetentionConfigsWhenRemoteCopyDisabled(Map<?, ?> newConfigs, boolean isRemoteLogStorageEnabled) {
|
||||||
|
boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false);
|
||||||
|
long retentionMs = (Long) newConfigs.get(TopicConfig.RETENTION_MS_CONFIG);
|
||||||
|
long localRetentionMs = (Long) newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
|
||||||
|
long retentionBytes = (Long) newConfigs.get(TopicConfig.RETENTION_BYTES_CONFIG);
|
||||||
|
long localRetentionBytes = (Long) newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
|
||||||
|
if (isRemoteLogStorageEnabled && isRemoteLogCopyDisabled) {
|
||||||
|
if (localRetentionBytes != -2 && localRetentionBytes != retentionBytes) {
|
||||||
|
throw new InvalidConfigurationException("When `remote.log.copy.disable` is set to true, the `local.retention.bytes` " +
|
||||||
|
"and `retention.bytes` must be set to the identical value because there will be no more logs copied to the remote storage.");
|
||||||
|
}
|
||||||
|
if (localRetentionMs != -2 && localRetentionMs != retentionMs) {
|
||||||
|
throw new InvalidConfigurationException("When `remote.log.copy.disable` is set to true, the `local.retention.ms` " +
|
||||||
|
"and `retention.ms` must be set to the identical value because there will be no more logs copied to the remote storage.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> newConfigs) {
|
public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> newConfigs) {
|
||||||
boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false);
|
boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false);
|
||||||
boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false);
|
boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false);
|
||||||
|
|
|
@ -60,8 +60,16 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness
|
||||||
final Map<Integer, List<Integer>> assignment = mkMap(
|
final Map<Integer, List<Integer>> assignment = mkMap(
|
||||||
mkEntry(p0, Arrays.asList(broker0, broker1))
|
mkEntry(p0, Arrays.asList(broker0, broker1))
|
||||||
);
|
);
|
||||||
final Map<String, String> disableCopy = new HashMap<>();
|
// local.retention.ms/bytes need to set to the same value as retention.ms/bytes when disabling remote log copy
|
||||||
disableCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true");
|
final Map<String, String> disableRemoteCopy = new HashMap<>();
|
||||||
|
disableRemoteCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true");
|
||||||
|
disableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2");
|
||||||
|
disableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2");
|
||||||
|
|
||||||
|
// revert the change to local.retention.bytes
|
||||||
|
final Map<String, String> enableRemoteCopy = new HashMap<>();
|
||||||
|
enableRemoteCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false");
|
||||||
|
enableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
|
||||||
|
|
||||||
final Map<String, String> deleteOnDisable = new HashMap<>();
|
final Map<String, String> deleteOnDisable = new HashMap<>();
|
||||||
deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false");
|
deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false");
|
||||||
|
@ -78,7 +86,7 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness
|
||||||
new KeyValueSpec("k2", "v2"))
|
new KeyValueSpec("k2", "v2"))
|
||||||
// disable remote log copy
|
// disable remote log copy
|
||||||
.updateTopicConfig(topicA,
|
.updateTopicConfig(topicA,
|
||||||
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
|
disableRemoteCopy,
|
||||||
Collections.emptyList())
|
Collections.emptyList())
|
||||||
|
|
||||||
// make sure we can still consume from the beginning of the topic to read data from local and remote storage
|
// make sure we can still consume from the beginning of the topic to read data from local and remote storage
|
||||||
|
@ -87,16 +95,16 @@ public final class DisableRemoteLogOnTopicTest extends TieredStorageTestHarness
|
||||||
|
|
||||||
// re-enable remote log copy
|
// re-enable remote log copy
|
||||||
.updateTopicConfig(topicA,
|
.updateTopicConfig(topicA,
|
||||||
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false"),
|
enableRemoteCopy,
|
||||||
Collections.emptyList())
|
Collections.emptyList())
|
||||||
|
|
||||||
// make sure the logs can be offloaded
|
// make sure the logs can be offloaded
|
||||||
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
|
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
|
||||||
.produce(topicA, p0, new KeyValueSpec("k3", "v3"))
|
.produce(topicA, p0, new KeyValueSpec("k3", "v3"))
|
||||||
|
|
||||||
// explicitly disable remote log copy
|
// disable remote log copy again
|
||||||
.updateTopicConfig(topicA,
|
.updateTopicConfig(topicA,
|
||||||
disableCopy,
|
disableRemoteCopy,
|
||||||
Collections.emptyList())
|
Collections.emptyList())
|
||||||
// make sure we can still consume from the beginning of the topic to read data from local and remote storage
|
// make sure we can still consume from the beginning of the topic to read data from local and remote storage
|
||||||
.expectFetchFromTieredStorage(broker0, topicA, p0, 3)
|
.expectFetchFromTieredStorage(broker0, topicA, p0, 3)
|
||||||
|
|
Loading…
Reference in New Issue