MINOR: Followup KAFKA-19112 document updated (#20492)

Some sections are not very clear, and we need to update the
documentation.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Jun Rao
 <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-09-28 19:06:06 +08:00 committed by GitHub
parent fb0518c34e
commit 41611b4bd2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 56 additions and 6 deletions

View File

@ -1658,7 +1658,7 @@ class Partition(val topicPartition: TopicPartition,
def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
if (!leaderLog.config.delete)
if (!leaderLog.config.delete && leaderLog.config.compact)
throw new PolicyViolationException(s"Records of partition $topicPartition can not be deleted due to the configured policy")
val convertedOffset = if (offset == DeleteRecordsRequest.HIGH_WATERMARK)

View File

@ -21,7 +21,7 @@ import com.yammer.metrics.core.Metric
import kafka.log.LogManager
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException}
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, PolicyViolationException, UnknownLeaderEpochException}
import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
@ -61,7 +61,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, Unexpec
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -4030,4 +4030,46 @@ class PartitionTest extends AbstractPartitionTest {
alterPartitionManager)
partition.tryCompleteDelayedRequests()
}
@Test
def testDeleteRecordsOnLeaderWithEmptyPolicy(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
val emptyPolicyConfig = new LogConfig(util.Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, ""
))
val mockLog = mock(classOf[UnifiedLog])
when(mockLog.config).thenReturn(emptyPolicyConfig)
when(mockLog.logEndOffset).thenReturn(2L)
when(mockLog.logStartOffset).thenReturn(0L)
when(mockLog.highWatermark).thenReturn(2L)
when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true)
partition.setLog(mockLog, false)
val result = partition.deleteRecordsOnLeader(1L)
assertEquals(1L, result.requestedOffset)
}
@Test
def testDeleteRecordsOnLeaderWithCompactPolicy(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
val emptyPolicyConfig = new LogConfig(util.Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, "compact"
))
val mockLog = mock(classOf[UnifiedLog])
when(mockLog.config).thenReturn(emptyPolicyConfig)
when(mockLog.logEndOffset).thenReturn(2L)
when(mockLog.logStartOffset).thenReturn(0L)
when(mockLog.highWatermark).thenReturn(2L)
when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true)
partition.setLog(mockLog, false)
assertThrows(classOf[PolicyViolationException], () => partition.deleteRecordsOnLeader(1L))
}
}

View File

@ -136,9 +136,17 @@
settings.
</li>
<li>
The <code>cleanup.policy</code> is empty and <code>remote.storage.enable</code> is set to true, the
<code>cleanup.policy</code> now supports empty values, which means infinite retention.
This is equivalent to setting <code>retention.ms=-1</code> and <code>retention.bytes=-1</code>
<br>
If <code>cleanup.policy</code> is empty and <code>remote.storage.enable</code> is set to true, the
local log segments will be cleaned based on the values of <code>log.local.retention.bytes</code> and
<code>log.local.retention.ms</code>.
<br>
If <code>cleanup.policy</code> is empty and <code>remote.storage.enable</code> is set to false,
local log segments will not be deleted automatically. However, records can still be deleted
explicitly through <code>deleteRecords</code> API calls, which will advance the log start offset
and remove the corresponding log segments.
</li>
</ul>
</li>

View File

@ -1934,8 +1934,8 @@ public class UnifiedLog implements AutoCloseable {
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments();
} else {
// If cleanup.policy is empty and remote storage is disabled, we should not delete any local
// log segments
// If cleanup.policy is empty and remote storage is disabled, we should not delete any local log segments
// unless the log start offset advances through deleteRecords
return deleteLogStartOffsetBreachedSegments();
}
}