From d23a61738ac44b8ecc6a7f69a83f3d94edaa8f90 Mon Sep 17 00:00:00 2001 From: Nick Guo Date: Tue, 25 Feb 2025 07:45:47 +0800 Subject: [PATCH] KAFKA-17937 Cleanup AbstractFetcherThreadTest (#18900) - Remove AbstractFetcherThreadWithIbp26Test as it tests unsupported IBP - cleanup AbstractFetcherThreadTest to remove unreachable paths, variables, and code Reviewers: Chia-Ping Tsai --- .../server/AbstractFetcherThreadTest.scala | 84 ++++++++----------- .../AbstractFetcherThreadWithIbp26Test.scala | 28 ------- 2 files changed, 37 insertions(+), 75 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/server/AbstractFetcherThreadWithIbp26Test.scala diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 5f01458ffa7..0e38e9dfcb0 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -28,9 +28,10 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.storage.internals.log.LogAppendInfo import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Assumptions.assumeTrue import org.junit.jupiter.api.{BeforeEach, Test} import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -39,7 +40,6 @@ import scala.jdk.CollectionConverters._ class AbstractFetcherThreadTest { - val truncateOnFetch = true val topicIds = Map("topic1" -> Uuid.randomUuid(), "topic2" -> Uuid.randomUuid()) val version = ApiKeys.FETCH.latestVersion() private val partition1 = new TopicPartition("topic1", 0) @@ -56,7 +56,7 @@ class AbstractFetcherThreadTest { @Test def testMetricsRemovedOnShutdown(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) @@ -86,7 +86,7 @@ class AbstractFetcherThreadTest { @Test def testConsumerLagRemovedWithPartition(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) @@ -111,7 +111,7 @@ class AbstractFetcherThreadTest { @Test def testSimpleFetch(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) @@ -136,7 +136,7 @@ class AbstractFetcherThreadTest { val partition = new TopicPartition("topic", 0) val fetchBackOffMs = 250 - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) { + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) { override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { throw new UnknownTopicIdException("Topic ID was unknown as expected for this test") } @@ -178,7 +178,7 @@ class AbstractFetcherThreadTest { val partition3 = new TopicPartition("topic3", 0) val fetchBackOffMs = 250 - val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) { + val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) { override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { Map(partition1 -> new FetchData().setErrorCode(Errors.UNKNOWN_TOPIC_ID.code), partition2 -> new FetchData().setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code), @@ -221,7 +221,7 @@ class AbstractFetcherThreadTest { @Test def testFencedTruncation(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions) @@ -249,7 +249,7 @@ class AbstractFetcherThreadTest { @Test def testFencedFetch(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions) @@ -282,7 +282,7 @@ class AbstractFetcherThreadTest { @Test def testUnknownLeaderEpochInTruncation(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions) @@ -315,7 +315,7 @@ class AbstractFetcherThreadTest { @Test def testUnknownLeaderEpochWhileFetching(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) @@ -358,7 +358,7 @@ class AbstractFetcherThreadTest { @Test def testTruncation(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) @@ -394,7 +394,7 @@ class AbstractFetcherThreadTest { def testTruncateToHighWatermarkIfLeaderEpochInfoNotAvailable(): Unit = { val highWatermark = 2L val partition = new TopicPartition("topic", 0) - val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) { + val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) { override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = throw new UnsupportedOperationException } @@ -431,7 +431,7 @@ class AbstractFetcherThreadTest { val highWatermark = 2L val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) { override def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = { @@ -463,7 +463,7 @@ class AbstractFetcherThreadTest { val partition = new TopicPartition("topic", 0) var truncations = 0 - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) { override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = { @@ -505,10 +505,9 @@ class AbstractFetcherThreadTest { @Test def testTruncationOnFetchSkippedIfPartitionRemoved(): Unit = { - assumeTrue(truncateOnFetch) val partition = new TopicPartition("topic", 0) var truncations = 0 - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) { override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = { @@ -550,7 +549,7 @@ class AbstractFetcherThreadTest { @Test def testFollowerFetchOutOfRangeHigh(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) @@ -595,7 +594,7 @@ class AbstractFetcherThreadTest { val partition = new TopicPartition("topic", 0) var fetchedEarliestOffset = false - val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) { + val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) { override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = { fetchedEarliestOffset = true throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced") @@ -632,7 +631,7 @@ class AbstractFetcherThreadTest { @Test def testFollowerFetchOutOfRangeLow(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions) @@ -653,11 +652,9 @@ class AbstractFetcherThreadTest { // initial truncation and verify that the log start offset is updated fetcher.doWork() - if (truncateOnFetch) { - // Second iteration required here since first iteration is required to - // perform initial truncation based on diverging epoch. - fetcher.doWork() - } + // Second iteration required here since first iteration is required to + // perform initial truncation based on diverging epoch. + fetcher.doWork() assertEquals(Option(Fetching), fetcher.fetchState(partition).map(_.state)) assertEquals(2, replicaState.logStartOffset) assertEquals(List(), replicaState.log.toList) @@ -675,7 +672,7 @@ class AbstractFetcherThreadTest { @Test def testRetryAfterUnknownLeaderEpochInLatestOffsetFetch(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) { + val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) { val tries = new AtomicInteger(0) override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = { if (tries.getAndIncrement() == 0) @@ -719,7 +716,7 @@ class AbstractFetcherThreadTest { def testCorruptMessage(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) { + val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) { var fetchedOnce = false override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { val fetchedData = super.fetch(fetchRequest) @@ -752,30 +749,26 @@ class AbstractFetcherThreadTest { assertEquals(2L, replicaState.logEndOffset) } - @Test - def testLeaderEpochChangeDuringFencedFetchEpochsFromLeader(): Unit = { + @ParameterizedTest + @ValueSource(ints = Array(0, 1)) + def testParameterizedLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader: Int): Unit = { + // When leaderEpochOnLeader = 1: // The leader is on the new epoch when the OffsetsForLeaderEpoch with old epoch is sent, so it // returns the fence error. Validate that response is ignored if the leader epoch changes on // the follower while OffsetsForLeaderEpoch request is in flight, but able to truncate and fetch // in the next of round of "doWork" - testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader = 1) - } - @Test - def testLeaderEpochChangeDuringSuccessfulFetchEpochsFromLeader(): Unit = { + // When leaderEpochOnLeader = 0: // The leader is on the old epoch when the OffsetsForLeaderEpoch with old epoch is sent // and returns the valid response. Validate that response is ignored if the leader epoch changes // on the follower while OffsetsForLeaderEpoch request is in flight, but able to truncate and // fetch once the leader is on the newer epoch (same as follower) - testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader = 0) - } - private def testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader: Int): Unit = { val partition = new TopicPartition("topic", 1) val initialLeaderEpochOnFollower = 0 val nextLeaderEpochOnFollower = initialLeaderEpochOnFollower + 1 - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) { + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) { var fetchEpochsFromLeaderOnce = false override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { @@ -831,7 +824,7 @@ class AbstractFetcherThreadTest { val initialLeaderEpochOnFollower = 0 val nextLeaderEpochOnFollower = initialLeaderEpochOnFollower + 1 - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) { + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) { override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { val fetchedEpochs = super.fetchEpochEndOffsets(partitions) responseCallback.apply() @@ -877,7 +870,7 @@ class AbstractFetcherThreadTest { @Test def testTruncationThrowsExceptionIfLeaderReturnsPartitionsNotRequestedInFetchEpochs(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) { + val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) { override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { val unrequestedTp = new TopicPartition("topic2", 0) super.fetchEpochEndOffsets(partitions).toMap + (unrequestedTp -> new EpochEndOffset() @@ -901,7 +894,7 @@ class AbstractFetcherThreadTest { @Test def testFetcherThreadHandlingPartitionFailureDuringAppending(): Unit = { - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcherForAppend = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions) { override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = { @@ -917,7 +910,7 @@ class AbstractFetcherThreadTest { @Test def testFetcherThreadHandlingPartitionFailureDuringTruncation(): Unit = { - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcherForTruncation = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions) { override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = { @@ -968,7 +961,7 @@ class AbstractFetcherThreadTest { @Test def testDivergingEpochs(): Unit = { val partition = new TopicPartition("topic", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) @@ -1004,13 +997,10 @@ class AbstractFetcherThreadTest { @Test def testTruncateOnFetchDoesNotProcessPartitionData(): Unit = { - assumeTrue(truncateOnFetch) - val partition = new TopicPartition("topic", 0) - var truncateCalls = 0 var processPartitionDataCalls = 0 - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) { override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = { @@ -1078,7 +1068,7 @@ class AbstractFetcherThreadTest { @Test def testMaybeUpdateTopicIds(): Unit = { val partition = new TopicPartition("topic1", 0) - val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadWithIbp26Test.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadWithIbp26Test.scala deleted file mode 100644 index f2e04a44498..00000000000 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadWithIbp26Test.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import org.apache.kafka.common.Uuid - -class AbstractFetcherThreadWithIbp26Test extends AbstractFetcherThreadTest { - - override val truncateOnFetch = false - override val version = 11 - override val topicIds = Map.empty[String, Uuid] - -}