KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly (#16996)

1) When the local.retention.ms/bytes is set to -2, we didn't replace it with the server-side retention.ms/bytes config, so the -2 local retention won't take effect.
2) When setting retention.ms/bytes to -2, we can notice this log message:

```
Deleting segment LogSegment(baseOffset=10045, size=1037087, lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to local log retention size -2 breach. Local log size after deletion will be 13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]
```
This is not helpful for users. We should replace -2 with real retention value when logging.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2024-08-26 06:22:35 +08:00 committed by GitHub
parent f596a0dffc
commit 57b6c2ef98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 58 additions and 9 deletions

View File

@ -2308,11 +2308,11 @@ object UnifiedLog extends Logging {
} }
private[log] def localRetentionMs(config: LogConfig, remoteLogEnabled: Boolean): Long = { private[log] def localRetentionMs(config: LogConfig, remoteLogEnabled: Boolean): Long = {
if (remoteLogEnabled) config.remoteLogConfig.localRetentionMs else config.retentionMs if (remoteLogEnabled) config.localRetentionMs else config.retentionMs
} }
private[log] def localRetentionSize(config: LogConfig, remoteLogEnabled: Boolean): Long = { private[log] def localRetentionSize(config: LogConfig, remoteLogEnabled: Boolean): Long = {
if (remoteLogEnabled) config.remoteLogConfig.localRetentionBytes else config.retentionSize if (remoteLogEnabled) config.localRetentionBytes else config.retentionSize
} }
} }

View File

@ -4097,6 +4097,54 @@ class UnifiedLogTest {
assertEquals(1, log.logSegments.size) assertEquals(1, log.logSegments.size)
} }
@Test
def testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionBytes(): Unit = {
def createRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
val segmentBytes = createRecords.sizeInBytes()
val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, retentionBytes = 1,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
val log = createLog(logDir, retentionBytesConfig, remoteStorageSystemEnable = true)
// Given 6 segments of 1 message each
for (_ <- 0 until 6) {
log.appendAsLeader(createRecords, leaderEpoch = 0)
}
assertEquals(6, log.logSegments.size)
log.updateHighWatermark(log.logEndOffset)
// simulate calls to upload 2 segments to remote storage
log.updateHighestOffsetInRemoteStorage(1)
log.deleteOldSegments()
assertEquals(4, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(2, log.localLogStartOffset())
}
@Test
def testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionMs(): Unit = {
def createRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
val segmentBytes = createRecords.sizeInBytes()
val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, retentionMs = 1000,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
val log = createLog(logDir, retentionBytesConfig, remoteStorageSystemEnable = true)
// Given 6 segments of 1 message each
for (_ <- 0 until 6) {
log.appendAsLeader(createRecords, leaderEpoch = 0)
}
assertEquals(6, log.logSegments.size)
log.updateHighWatermark(log.logEndOffset)
// simulate calls to upload 2 segments to remote storage
log.updateHighestOffsetInRemoteStorage(1)
mockTime.sleep(1001)
log.deleteOldSegments()
assertEquals(4, log.logSegments.size())
assertEquals(0, log.logStartOffset)
assertEquals(2, log.localLogStartOffset())
}
@Test @Test
def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = { def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true) val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true)

View File

@ -100,11 +100,11 @@ public class LogConfig extends AbstractConfig {
} }
} }
public static class RemoteLogConfig { private static class RemoteLogConfig {
public final boolean remoteStorageEnable; private final boolean remoteStorageEnable;
public final long localRetentionMs; private final long localRetentionMs;
public final long localRetentionBytes; private final long localRetentionBytes;
private RemoteLogConfig(LogConfig config) { private RemoteLogConfig(LogConfig config) {
this.remoteStorageEnable = config.getBoolean(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); this.remoteStorageEnable = config.getBoolean(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
@ -334,8 +334,8 @@ public class LogConfig extends AbstractConfig {
public final List<String> leaderReplicationThrottledReplicas; public final List<String> leaderReplicationThrottledReplicas;
public final List<String> followerReplicationThrottledReplicas; public final List<String> followerReplicationThrottledReplicas;
public final boolean messageDownConversionEnable; public final boolean messageDownConversionEnable;
public final RemoteLogConfig remoteLogConfig;
private final RemoteLogConfig remoteLogConfig;
private final int maxMessageSize; private final int maxMessageSize;
private final Map<?, ?> props; private final Map<?, ?> props;

View File

@ -23,6 +23,7 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
@ -54,8 +55,8 @@ public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1")) .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2")) .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L) .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), .produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3")) new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3", System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1)))
// update the topic config such that it triggers the deletion of segments // update the topic config such that it triggers the deletion of segments
.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList()) .updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList())
// expect that the three offloaded remote log segments are deleted // expect that the three offloaded remote log segments are deleted