MINOR: Remove dead code `maybeWarnIfOversizedRecords` (#19316)

The `metadataVersionSupplier` is unused after this - remove it.

Also remove redundant `metadataVersion.fetchRequestVersion >= 13` check
in `RemoteLeaderEndPoint` - the minimum version returned by this method
is `13`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ismael Juma 2025-04-01 06:36:25 -07:00 committed by GitHub
parent e301508b53
commit ccf2510fdd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 12 additions and 31 deletions

View File

@ -203,7 +203,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
None None
} else { } else {
val metadataVersion = metadataVersionSupplier() val metadataVersion = metadataVersionSupplier()
val version: Short = if (metadataVersion.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) { val version: Short = if (!fetchData.canUseTopicIds) {
12 12
} else { } else {
metadataVersion.fetchRequestVersion metadataVersion.fetchRequestVersion

View File

@ -47,7 +47,7 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig, val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,
replicaManager, quotaManager, metadataVersionSupplier, brokerEpochSupplier) replicaManager, quotaManager, metadataVersionSupplier, brokerEpochSupplier)
new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager, new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,
quotaManager, logContext.logPrefix, metadataVersionSupplier) quotaManager, logContext.logPrefix)
} }
def shutdown(): Unit = { def shutdown(): Unit = {

View File

@ -18,9 +18,8 @@
package kafka.server package kafka.server
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchResponse import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason} import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason}
import java.util.Optional import java.util.Optional
@ -32,8 +31,7 @@ class ReplicaFetcherThread(name: String,
failedPartitions: FailedPartitions, failedPartitions: FailedPartitions,
replicaMgr: ReplicaManager, replicaMgr: ReplicaManager,
quota: ReplicaQuota, quota: ReplicaQuota,
logPrefix: String, logPrefix: String)
metadataVersionSupplier: () => MetadataVersion)
extends AbstractFetcherThread(name = name, extends AbstractFetcherThread(name = name,
clientId = name, clientId = name,
leader = leader, leader = leader,
@ -110,8 +108,6 @@ class ReplicaFetcherThread(name: String,
val log = partition.localLogOrException val log = partition.localLogOrException
val records = toMemoryRecords(FetchResponse.recordsOrFail(partitionData)) val records = toMemoryRecords(FetchResponse.recordsOrFail(partitionData))
maybeWarnIfOversizedRecords(records, topicPartition)
if (fetchOffset != log.logEndOffset) if (fetchOffset != log.logEndOffset)
throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format( throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, log.logEndOffset)) topicPartition, fetchOffset, log.logEndOffset))
@ -161,15 +157,6 @@ class ReplicaFetcherThread(name: String,
} }
} }
private def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
// oversized messages don't cause replication to fail from fetch request version 3 (KIP-74)
if (metadataVersionSupplier().fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " +
"This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " +
"message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " +
"equal or larger than your settings for max.message.bytes, both at a broker and topic level.")
}
/** /**
* Truncate the log for each partition's epoch based on leader's returned epoch and offset. * Truncate the log for each partition's epoch based on leader's returned epoch and offset.
* The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState * The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState

View File

@ -100,8 +100,7 @@ class ReplicaFetcherThreadTest {
failedPartitions, failedPartitions,
replicaMgr, replicaMgr,
quota, quota,
logContext.logPrefix, logContext.logPrefix)
() => metadataVersion)
} }
@Test @Test
@ -291,8 +290,7 @@ class ReplicaFetcherThreadTest {
failedPartitions, failedPartitions,
replicaManager, replicaManager,
quota, quota,
logContext.logPrefix, logContext.logPrefix
() => MetadataVersion.MINIMUM_VERSION
) { ) {
override def processPartitionData( override def processPartitionData(
topicPartition: TopicPartition, topicPartition: TopicPartition,
@ -423,8 +421,7 @@ class ReplicaFetcherThreadTest {
failedPartitions, failedPartitions,
replicaManager, replicaManager,
quota, quota,
logContext.logPrefix, logContext.logPrefix
() => MetadataVersion.MINIMUM_VERSION
) )
thread.addPartitions(Map( thread.addPartitions(Map(
@ -515,8 +512,7 @@ class ReplicaFetcherThreadTest {
failedPartitions, failedPartitions,
replicaManager, replicaManager,
quota, quota,
logContext.logPrefix, logContext.logPrefix
() => MetadataVersion.MINIMUM_VERSION
) )
thread.addPartitions(Map( thread.addPartitions(Map(
@ -620,8 +616,7 @@ class ReplicaFetcherThreadTest {
failedPartitions, failedPartitions,
replicaManager, replicaManager,
replicaQuota, replicaQuota,
logContext.logPrefix, logContext.logPrefix)
() => MetadataVersion.MINIMUM_VERSION)
val leaderEpoch = 1 val leaderEpoch = 1

View File

@ -2904,7 +2904,7 @@ class ReplicaManagerTest {
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, rm.config, val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, rm.config,
rm, quotaManager.follower, () => MetadataVersion.MINIMUM_VERSION, () => 1) rm, quotaManager.follower, () => MetadataVersion.MINIMUM_VERSION, () => 1)
new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, rm.config, failedPartitions, rm, new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, rm.config, failedPartitions, rm,
quotaManager.follower, logContext.logPrefix, () => MetadataVersion.MINIMUM_VERSION) { quotaManager.follower, logContext.logPrefix) {
override def doWork(): Unit = { override def doWork(): Unit = {
// In case the thread starts before the partition is added by AbstractFetcherManager, // In case the thread starts before the partition is added by AbstractFetcherManager,
// add it here (it's a no-op if already added) // add it here (it's a no-op if already added)
@ -3363,7 +3363,7 @@ class ReplicaManagerTest {
leader.setReplicaPartitionStateCallback(_ => PartitionState(leaderEpoch = 0)) leader.setReplicaPartitionStateCallback(_ => PartitionState(leaderEpoch = 0))
val fetcher = new ReplicaFetcherThread(threadName, leader, config, failedPartitions, replicaManager, val fetcher = new ReplicaFetcherThread(threadName, leader, config, failedPartitions, replicaManager,
quotaManager, "", () => MetadataVersion.MINIMUM_VERSION) quotaManager, "")
val initialFetchState = InitialFetchState( val initialFetchState = InitialFetchState(
topicId = Some(Uuid.randomUuid()), topicId = Some(Uuid.randomUuid()),

View File

@ -302,8 +302,7 @@ public class ReplicaFetcherThreadBenchmark {
new FailedPartitions(), new FailedPartitions(),
replicaManager, replicaManager,
replicaQuota, replicaQuota,
String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3), String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3)
() -> MetadataVersion.MINIMUM_VERSION
); );
pool = partitions; pool = partitions;