KAFKA-17937 Cleanup AbstractFetcherThreadTest (#18900)

- Remove AbstractFetcherThreadWithIbp26Test as it tests unsupported IBP
- cleanup AbstractFetcherThreadTest to remove unreachable paths, variables, and code

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Nick Guo 2025-02-25 07:45:47 +08:00 committed by GitHub
parent 009bee75ab
commit d23a61738a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 75 deletions

View File

@ -28,9 +28,10 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Assumptions.assumeTrue
import org.junit.jupiter.api.{BeforeEach, Test}
import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
@ -39,7 +40,6 @@ import scala.jdk.CollectionConverters._
class AbstractFetcherThreadTest {
val truncateOnFetch = true
val topicIds = Map("topic1" -> Uuid.randomUuid(), "topic2" -> Uuid.randomUuid())
val version = ApiKeys.FETCH.latestVersion()
private val partition1 = new TopicPartition("topic1", 0)
@ -56,7 +56,7 @@ class AbstractFetcherThreadTest {
@Test
def testMetricsRemovedOnShutdown(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine)
@ -86,7 +86,7 @@ class AbstractFetcherThreadTest {
@Test
def testConsumerLagRemovedWithPartition(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine)
@ -111,7 +111,7 @@ class AbstractFetcherThreadTest {
@Test
def testSimpleFetch(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine)
@ -136,7 +136,7 @@ class AbstractFetcherThreadTest {
val partition = new TopicPartition("topic", 0)
val fetchBackOffMs = 250
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) {
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) {
override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
throw new UnknownTopicIdException("Topic ID was unknown as expected for this test")
}
@ -178,7 +178,7 @@ class AbstractFetcherThreadTest {
val partition3 = new TopicPartition("topic3", 0)
val fetchBackOffMs = 250
val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) {
val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) {
override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
Map(partition1 -> new FetchData().setErrorCode(Errors.UNKNOWN_TOPIC_ID.code),
partition2 -> new FetchData().setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code),
@ -221,7 +221,7 @@ class AbstractFetcherThreadTest {
@Test
def testFencedTruncation(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions)
@ -249,7 +249,7 @@ class AbstractFetcherThreadTest {
@Test
def testFencedFetch(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions)
@ -282,7 +282,7 @@ class AbstractFetcherThreadTest {
@Test
def testUnknownLeaderEpochInTruncation(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions)
@ -315,7 +315,7 @@ class AbstractFetcherThreadTest {
@Test
def testUnknownLeaderEpochWhileFetching(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine)
@ -358,7 +358,7 @@ class AbstractFetcherThreadTest {
@Test
def testTruncation(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine)
@ -394,7 +394,7 @@ class AbstractFetcherThreadTest {
def testTruncateToHighWatermarkIfLeaderEpochInfoNotAvailable(): Unit = {
val highWatermark = 2L
val partition = new TopicPartition("topic", 0)
val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) {
val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) {
override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] =
throw new UnsupportedOperationException
}
@ -431,7 +431,7 @@ class AbstractFetcherThreadTest {
val highWatermark = 2L
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) {
override def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = {
@ -463,7 +463,7 @@ class AbstractFetcherThreadTest {
val partition = new TopicPartition("topic", 0)
var truncations = 0
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) {
override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
@ -505,10 +505,9 @@ class AbstractFetcherThreadTest {
@Test
def testTruncationOnFetchSkippedIfPartitionRemoved(): Unit = {
assumeTrue(truncateOnFetch)
val partition = new TopicPartition("topic", 0)
var truncations = 0
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) {
override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
@ -550,7 +549,7 @@ class AbstractFetcherThreadTest {
@Test
def testFollowerFetchOutOfRangeHigh(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine)
@ -595,7 +594,7 @@ class AbstractFetcherThreadTest {
val partition = new TopicPartition("topic", 0)
var fetchedEarliestOffset = false
val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) {
val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) {
override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
fetchedEarliestOffset = true
throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced")
@ -632,7 +631,7 @@ class AbstractFetcherThreadTest {
@Test
def testFollowerFetchOutOfRangeLow(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions)
@ -653,11 +652,9 @@ class AbstractFetcherThreadTest {
// initial truncation and verify that the log start offset is updated
fetcher.doWork()
if (truncateOnFetch) {
// Second iteration required here since first iteration is required to
// perform initial truncation based on diverging epoch.
fetcher.doWork()
}
assertEquals(Option(Fetching), fetcher.fetchState(partition).map(_.state))
assertEquals(2, replicaState.logStartOffset)
assertEquals(List(), replicaState.log.toList)
@ -675,7 +672,7 @@ class AbstractFetcherThreadTest {
@Test
def testRetryAfterUnknownLeaderEpochInLatestOffsetFetch(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) {
val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) {
val tries = new AtomicInteger(0)
override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
if (tries.getAndIncrement() == 0)
@ -719,7 +716,7 @@ class AbstractFetcherThreadTest {
def testCorruptMessage(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) {
val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) {
var fetchedOnce = false
override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
val fetchedData = super.fetch(fetchRequest)
@ -752,30 +749,26 @@ class AbstractFetcherThreadTest {
assertEquals(2L, replicaState.logEndOffset)
}
@Test
def testLeaderEpochChangeDuringFencedFetchEpochsFromLeader(): Unit = {
@ParameterizedTest
@ValueSource(ints = Array(0, 1))
def testParameterizedLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader: Int): Unit = {
// When leaderEpochOnLeader = 1:
// The leader is on the new epoch when the OffsetsForLeaderEpoch with old epoch is sent, so it
// returns the fence error. Validate that response is ignored if the leader epoch changes on
// the follower while OffsetsForLeaderEpoch request is in flight, but able to truncate and fetch
// in the next of round of "doWork"
testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader = 1)
}
@Test
def testLeaderEpochChangeDuringSuccessfulFetchEpochsFromLeader(): Unit = {
// When leaderEpochOnLeader = 0:
// The leader is on the old epoch when the OffsetsForLeaderEpoch with old epoch is sent
// and returns the valid response. Validate that response is ignored if the leader epoch changes
// on the follower while OffsetsForLeaderEpoch request is in flight, but able to truncate and
// fetch once the leader is on the newer epoch (same as follower)
testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader = 0)
}
private def testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader: Int): Unit = {
val partition = new TopicPartition("topic", 1)
val initialLeaderEpochOnFollower = 0
val nextLeaderEpochOnFollower = initialLeaderEpochOnFollower + 1
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) {
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) {
var fetchEpochsFromLeaderOnce = false
override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
@ -831,7 +824,7 @@ class AbstractFetcherThreadTest {
val initialLeaderEpochOnFollower = 0
val nextLeaderEpochOnFollower = initialLeaderEpochOnFollower + 1
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) {
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) {
override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
val fetchedEpochs = super.fetchEpochEndOffsets(partitions)
responseCallback.apply()
@ -877,7 +870,7 @@ class AbstractFetcherThreadTest {
@Test
def testTruncationThrowsExceptionIfLeaderReturnsPartitionsNotRequestedInFetchEpochs(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndPoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version) {
val mockLeaderEndPoint = new MockLeaderEndPoint(version = version) {
override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
val unrequestedTp = new TopicPartition("topic2", 0)
super.fetchEpochEndOffsets(partitions).toMap + (unrequestedTp -> new EpochEndOffset()
@ -901,7 +894,7 @@ class AbstractFetcherThreadTest {
@Test
def testFetcherThreadHandlingPartitionFailureDuringAppending(): Unit = {
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcherForAppend = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions) {
override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = {
@ -917,7 +910,7 @@ class AbstractFetcherThreadTest {
@Test
def testFetcherThreadHandlingPartitionFailureDuringTruncation(): Unit = {
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcherForTruncation = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine, failedPartitions = failedPartitions) {
override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
@ -968,7 +961,7 @@ class AbstractFetcherThreadTest {
@Test
def testDivergingEpochs(): Unit = {
val partition = new TopicPartition("topic", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine)
@ -1004,13 +997,10 @@ class AbstractFetcherThreadTest {
@Test
def testTruncateOnFetchDoesNotProcessPartitionData(): Unit = {
assumeTrue(truncateOnFetch)
val partition = new TopicPartition("topic", 0)
var truncateCalls = 0
var processPartitionDataCalls = 0
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) {
override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = {
@ -1078,7 +1068,7 @@ class AbstractFetcherThreadTest {
@Test
def testMaybeUpdateTopicIds(): Unit = {
val partition = new TopicPartition("topic1", 0)
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine)

View File

@ -1,28 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.apache.kafka.common.Uuid
class AbstractFetcherThreadWithIbp26Test extends AbstractFetcherThreadTest {
override val truncateOnFetch = false
override val version = 11
override val topicIds = Map.empty[String, Uuid]
}