From ccf2510fdda755b7a8e0900d3316fade46eb533a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 1 Apr 2025 06:36:25 -0700 Subject: [PATCH] MINOR: Remove dead code `maybeWarnIfOversizedRecords` (#19316) The `metadataVersionSupplier` is unused after this - remove it. Also remove redundant `metadataVersion.fetchRequestVersion >= 13` check in `RemoteLeaderEndPoint` - the minimum version returned by this method is `13`. Reviewers: Chia-Ping Tsai --- .../kafka/server/RemoteLeaderEndPoint.scala | 2 +- .../kafka/server/ReplicaFetcherManager.scala | 2 +- .../kafka/server/ReplicaFetcherThread.scala | 17 ++--------------- .../kafka/server/ReplicaFetcherThreadTest.scala | 15 +++++---------- .../unit/kafka/server/ReplicaManagerTest.scala | 4 ++-- .../fetcher/ReplicaFetcherThreadBenchmark.java | 3 +-- 6 files changed, 12 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala index ce2eef15e79..2132a962d76 100644 --- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala @@ -203,7 +203,7 @@ class RemoteLeaderEndPoint(logPrefix: String, None } else { val metadataVersion = metadataVersionSupplier() - val version: Short = if (metadataVersion.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) { + val version: Short = if (!fetchData.canUseTopicIds) { 12 } else { metadataVersion.fetchRequestVersion diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index 621f50f9168..4ed5b05311d 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -47,7 +47,7 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig, val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig, replicaManager, quotaManager, metadataVersionSupplier, brokerEpochSupplier) new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager, - quotaManager, logContext.logPrefix, metadataVersionSupplier) + quotaManager, logContext.logPrefix) } def shutdown(): Unit = { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index cf2ade754a5..a20849d7b8b 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -18,9 +18,8 @@ package kafka.server import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.FetchResponse -import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} +import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason} import java.util.Optional @@ -32,8 +31,7 @@ class ReplicaFetcherThread(name: String, failedPartitions: FailedPartitions, replicaMgr: ReplicaManager, quota: ReplicaQuota, - logPrefix: String, - metadataVersionSupplier: () => MetadataVersion) + logPrefix: String) extends AbstractFetcherThread(name = name, clientId = name, leader = leader, @@ -110,8 +108,6 @@ class ReplicaFetcherThread(name: String, val log = partition.localLogOrException val records = toMemoryRecords(FetchResponse.recordsOrFail(partitionData)) - maybeWarnIfOversizedRecords(records, topicPartition) - if (fetchOffset != log.logEndOffset) throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format( topicPartition, fetchOffset, log.logEndOffset)) @@ -161,15 +157,6 @@ class ReplicaFetcherThread(name: String, } } - private def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = { - // oversized messages don't cause replication to fail from fetch request version 3 (KIP-74) - if (metadataVersionSupplier().fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0) - error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " + - "This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " + - "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " + - "equal or larger than your settings for max.message.bytes, both at a broker and topic level.") - } - /** * Truncate the log for each partition's epoch based on leader's returned epoch and offset. * The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 51be4501e88..409aaf57b73 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -100,8 +100,7 @@ class ReplicaFetcherThreadTest { failedPartitions, replicaMgr, quota, - logContext.logPrefix, - () => metadataVersion) + logContext.logPrefix) } @Test @@ -291,8 +290,7 @@ class ReplicaFetcherThreadTest { failedPartitions, replicaManager, quota, - logContext.logPrefix, - () => MetadataVersion.MINIMUM_VERSION + logContext.logPrefix ) { override def processPartitionData( topicPartition: TopicPartition, @@ -423,8 +421,7 @@ class ReplicaFetcherThreadTest { failedPartitions, replicaManager, quota, - logContext.logPrefix, - () => MetadataVersion.MINIMUM_VERSION + logContext.logPrefix ) thread.addPartitions(Map( @@ -515,8 +512,7 @@ class ReplicaFetcherThreadTest { failedPartitions, replicaManager, quota, - logContext.logPrefix, - () => MetadataVersion.MINIMUM_VERSION + logContext.logPrefix ) thread.addPartitions(Map( @@ -620,8 +616,7 @@ class ReplicaFetcherThreadTest { failedPartitions, replicaManager, replicaQuota, - logContext.logPrefix, - () => MetadataVersion.MINIMUM_VERSION) + logContext.logPrefix) val leaderEpoch = 1 diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index fda7eceab7f..2c65ad960cb 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2904,7 +2904,7 @@ class ReplicaManagerTest { val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, rm.config, rm, quotaManager.follower, () => MetadataVersion.MINIMUM_VERSION, () => 1) new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, rm.config, failedPartitions, rm, - quotaManager.follower, logContext.logPrefix, () => MetadataVersion.MINIMUM_VERSION) { + quotaManager.follower, logContext.logPrefix) { override def doWork(): Unit = { // In case the thread starts before the partition is added by AbstractFetcherManager, // add it here (it's a no-op if already added) @@ -3363,7 +3363,7 @@ class ReplicaManagerTest { leader.setReplicaPartitionStateCallback(_ => PartitionState(leaderEpoch = 0)) val fetcher = new ReplicaFetcherThread(threadName, leader, config, failedPartitions, replicaManager, - quotaManager, "", () => MetadataVersion.MINIMUM_VERSION) + quotaManager, "") val initialFetchState = InitialFetchState( topicId = Some(Uuid.randomUuid()), diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index db23fd55eab..fcdc820311e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -302,8 +302,7 @@ public class ReplicaFetcherThreadBenchmark { new FailedPartitions(), replicaManager, replicaQuota, - String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3), - () -> MetadataVersion.MINIMUM_VERSION + String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3) ); pool = partitions;