KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly (#16932)

1) When the local.retention.ms/bytes is set to -2, we didn't replace it with the server-side retention.ms/bytes config, so the -2 local retention won't take effect.
2) When setting retention.ms/bytes to -2, we can notice this log message:

```
Deleting segment LogSegment(baseOffset=10045, size=1037087, lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to local log retention size -2 breach. Local log size after deletion will be 13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]
```
This is not helpful for users. We should replace -2 with real retention value when logging.

Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2024-08-25 19:46:15 +08:00 committed by GitHub
parent d67c18b4ae
commit 11966a209a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 68 additions and 19 deletions

View File

@ -1584,7 +1584,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
shouldDelete
}
deleteOldSegments(shouldDelete, RetentionMsBreach(this, remoteLogEnabled()))
deleteOldSegments(shouldDelete, RetentionMsBreach(this, remoteLogEnabledAndRemoteCopyEnabled()))
}
private def deleteRetentionSizeBreachedSegments(): Int = {
@ -1601,7 +1601,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
shouldDelete
}
deleteOldSegments(shouldDelete, RetentionSizeBreach(this, remoteLogEnabled()))
deleteOldSegments(shouldDelete, RetentionSizeBreach(this, remoteLogEnabledAndRemoteCopyEnabled()))
}
private def deleteLogStartOffsetBreachedSegments(): Int = {
@ -2351,11 +2351,11 @@ object UnifiedLog extends Logging {
}
private[log] def localRetentionMs(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
if (remoteLogEnabledAndRemoteCopyEnabled) config.remoteLogConfig.localRetentionMs else config.retentionMs
if (remoteLogEnabledAndRemoteCopyEnabled) config.localRetentionMs else config.retentionMs
}
private[log] def localRetentionSize(config: LogConfig, remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
if (remoteLogEnabledAndRemoteCopyEnabled) config.remoteLogConfig.localRetentionBytes else config.retentionSize
if (remoteLogEnabledAndRemoteCopyEnabled) config.localRetentionBytes else config.retentionSize
}
}
@ -2371,19 +2371,19 @@ object LogMetricNames {
}
}
case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabled: Boolean) extends SegmentDeletionReason {
case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason {
override def logReason(toDelete: List[LogSegment]): Unit = {
val retentionMs = UnifiedLog.localRetentionMs(log.config, remoteLogEnabled)
val retentionMs = UnifiedLog.localRetentionMs(log.config, remoteLogEnabledAndRemoteCopyEnabled)
toDelete.foreach { segment =>
if (segment.largestRecordTimestamp.isPresent)
if (remoteLogEnabled)
if (remoteLogEnabledAndRemoteCopyEnabled)
log.info(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the largest " +
s"record timestamp in the segment")
else
log.info(s"Deleting segment $segment due to log retention time ${retentionMs}ms breach based on the largest " +
s"record timestamp in the segment")
else {
if (remoteLogEnabled)
if (remoteLogEnabledAndRemoteCopyEnabled)
log.info(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the " +
s"last modified time of the segment")
else
@ -2394,12 +2394,12 @@ case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabled: Boolean) extends
}
}
case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabled: Boolean) extends SegmentDeletionReason {
case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabledAndRemoteCopyEnabled: Boolean) extends SegmentDeletionReason {
override def logReason(toDelete: List[LogSegment]): Unit = {
var size = log.size
toDelete.foreach { segment =>
size -= segment.size
if (remoteLogEnabled) log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabled)} breach. " +
if (remoteLogEnabledAndRemoteCopyEnabled) log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabledAndRemoteCopyEnabled)} breach. " +
s"Local log size after deletion will be $size.")
else log.info(s"Deleting segment $segment due to log retention size ${log.config.retentionSize} breach. Log size " +
s"after deletion will be $size.")

View File

@ -4188,6 +4188,54 @@ class UnifiedLogTest {
assertEquals(1, log.logSegments.size)
}
@Test
def testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionBytes(): Unit = {
def createRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
val segmentBytes = createRecords.sizeInBytes()
val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, retentionBytes = 1,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
val log = createLog(logDir, retentionBytesConfig, remoteStorageSystemEnable = true)
// Given 6 segments of 1 message each
for (_ <- 0 until 6) {
log.appendAsLeader(createRecords, leaderEpoch = 0)
}
assertEquals(6, log.logSegments.size)
log.updateHighWatermark(log.logEndOffset)
// simulate calls to upload 2 segments to remote storage
log.updateHighestOffsetInRemoteStorage(1)
log.deleteOldSegments()
assertEquals(4, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(2, log.localLogStartOffset())
}
@Test
def testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionMs(): Unit = {
def createRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
val segmentBytes = createRecords.sizeInBytes()
val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, retentionMs = 1000,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
val log = createLog(logDir, retentionBytesConfig, remoteStorageSystemEnable = true)
// Given 6 segments of 1 message each
for (_ <- 0 until 6) {
log.appendAsLeader(createRecords, leaderEpoch = 0)
}
assertEquals(6, log.logSegments.size)
log.updateHighWatermark(log.logEndOffset)
// simulate calls to upload 2 segments to remote storage
log.updateHighestOffsetInRemoteStorage(1)
mockTime.sleep(1001)
log.deleteOldSegments()
assertEquals(4, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(2, log.localLogStartOffset())
}
@Test
def testRetentionOnLocalLogDeletionWhenRemoteLogCopyDisabled(): Unit = {
def createRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)))

View File

@ -108,13 +108,13 @@ public class LogConfig extends AbstractConfig {
}
}
public static class RemoteLogConfig {
private static class RemoteLogConfig {
public final boolean remoteStorageEnable;
public final boolean remoteLogDeleteOnDisable;
public final boolean remoteLogCopyDisable;
public final long localRetentionMs;
public final long localRetentionBytes;
private final boolean remoteStorageEnable;
private final boolean remoteLogDeleteOnDisable;
private final boolean remoteLogCopyDisable;
private final long localRetentionMs;
private final long localRetentionBytes;
private RemoteLogConfig(LogConfig config) {
this.remoteStorageEnable = config.getBoolean(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
@ -375,8 +375,8 @@ public class LogConfig extends AbstractConfig {
public final List<String> leaderReplicationThrottledReplicas;
public final List<String> followerReplicationThrottledReplicas;
public final boolean messageDownConversionEnable;
public final RemoteLogConfig remoteLogConfig;
private final RemoteLogConfig remoteLogConfig;
private final int maxMessageSize;
private final Map<?, ?> props;

View File

@ -23,6 +23,7 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
@ -54,8 +55,8 @@ public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"))
.produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1)))
// update the topic config such that it triggers the deletion of segments
.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList())
// expect that the three offloaded remote log segments are deleted