KAFKA-18384 Remove ZkAlterPartitionManager (#18364)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Ken Huang 2025-01-08 01:08:57 +08:00 committed by GitHub
parent abeed20168
commit 9d93a4f68f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 18 additions and 303 deletions

View File

@ -20,7 +20,6 @@ import java.util
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import kafka.utils.Logging import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.ClientResponse import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
@ -100,17 +99,6 @@ object AlterPartitionManager {
metadataVersionSupplier = () => metadataCache.metadataVersion() metadataVersionSupplier = () => metadataCache.metadataVersion()
) )
} }
/**
* Factory for ZK based implementation, used when IBP < 2.7-IV2
*/
def apply(
scheduler: Scheduler,
time: Time,
zkClient: KafkaZkClient
): AlterPartitionManager = {
new ZkAlterPartitionManager(scheduler, time, zkClient)
}
} }
class DefaultAlterPartitionManager( class DefaultAlterPartitionManager(

View File

@ -1,111 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.{Logging, ReplicationUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.CompletableFuture
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors.InvalidUpdateVersionException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.LeaderAndIsr
import org.apache.kafka.server.util.Scheduler
import scala.collection.mutable
/**
* @param checkIntervalMs How often to check for ISR
* @param maxDelayMs Maximum time that an ISR change may be delayed before sending the notification
* @param lingerMs Maximum time to await additional changes before sending the notification
*/
case class IsrChangePropagationConfig(checkIntervalMs: Long, maxDelayMs: Long, lingerMs: Long)
object ZkAlterPartitionManager {
// This field is mutable to allow overriding change notification behavior in test cases
@volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = IsrChangePropagationConfig(
checkIntervalMs = 2500,
lingerMs = 5000,
maxDelayMs = 60000,
)
}
class ZkAlterPartitionManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) extends AlterPartitionManager with Logging {
private val isrChangeNotificationConfig = ZkAlterPartitionManager.DefaultIsrPropagationConfig
// Visible for testing
private[server] val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
private val lastIsrChangeMs = new AtomicLong(time.milliseconds())
private val lastIsrPropagationMs = new AtomicLong(time.milliseconds())
override def start(): Unit = {
scheduler.schedule("isr-change-propagation", () => maybePropagateIsrChanges(), 0L,
isrChangeNotificationConfig.checkIntervalMs)
}
override def submit(
topicIdPartition: TopicIdPartition,
leaderAndIsr: LeaderAndIsr,
controllerEpoch: Int
): CompletableFuture[LeaderAndIsr]= {
debug(s"Writing new ISR ${leaderAndIsr.isr} to ZooKeeper with version " +
s"${leaderAndIsr.partitionEpoch} for partition $topicIdPartition")
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicIdPartition.topicPartition,
leaderAndIsr, controllerEpoch)
val future = new CompletableFuture[LeaderAndIsr]()
if (updateSucceeded) {
// Track which partitions need to be propagated to the controller
isrChangeSet synchronized {
isrChangeSet += topicIdPartition.topicPartition
lastIsrChangeMs.set(time.milliseconds())
}
// We rely on Partition#isrState being properly set to the pending ISR at this point since we are synchronously
// applying the callback
future.complete(leaderAndIsr.withPartitionEpoch(newVersion))
} else {
future.completeExceptionally(new InvalidUpdateVersionException(
s"ISR update $leaderAndIsr for partition $topicIdPartition with controller epoch $controllerEpoch " +
"failed with an invalid version error"))
}
future
}
/**
* This function periodically runs to see if ISR needs to be propagated. It propagates ISR when:
* 1. There is ISR change not propagated yet.
* 2. There is no ISR Change in the last five seconds, or it has been more than 60 seconds since the last ISR propagation.
* This allows an occasional ISR change to be propagated within a few seconds, and avoids overwhelming controller and
* other brokers when large amount of ISR change occurs.
*/
private[server] def maybePropagateIsrChanges(): Unit = {
val now = time.milliseconds()
isrChangeSet synchronized {
if (isrChangeSet.nonEmpty &&
(lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||
lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs < now)) {
zkClient.propagateIsrChanges(isrChangeSet)
isrChangeSet.clear()
lastIsrPropagationMs.set(now)
}
}
}
}

View File

@ -1,58 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.zk._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.metadata.LeaderAndIsr
import scala.jdk.CollectionConverters._
object ReplicationUtils extends Logging {
def updateLeaderAndIsr(zkClient: KafkaZkClient, partition: TopicPartition, newLeaderAndIsr: LeaderAndIsr,
controllerEpoch: Int): (Boolean, Int) = {
debug(s"Updated ISR for $partition to ${newLeaderAndIsr.isr.asScala.mkString(",")}")
val path = TopicPartitionStateZNode.path(partition)
val newLeaderData = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(newLeaderAndIsr, controllerEpoch))
// use the epoch of the controller that made the leadership decision, instead of the current controller epoch
val updatePersistentPath: (Boolean, Int) = zkClient.conditionalUpdatePath(path, newLeaderData,
newLeaderAndIsr.partitionEpoch, Some(checkLeaderAndIsrZkData))
updatePersistentPath
}
private def checkLeaderAndIsrZkData(zkClient: KafkaZkClient, path: String, expectedLeaderAndIsrInfo: Array[Byte]): (Boolean, Int) = {
try {
val (writtenLeaderOpt, writtenStat) = zkClient.getDataAndStat(path)
val expectedLeaderOpt = TopicPartitionStateZNode.decode(expectedLeaderAndIsrInfo, writtenStat)
val succeeded = writtenLeaderOpt.exists { writtenData =>
val writtenLeaderOpt = TopicPartitionStateZNode.decode(writtenData, writtenStat)
(expectedLeaderOpt, writtenLeaderOpt) match {
case (Some(expectedLeader), Some(writtenLeader)) if expectedLeader == writtenLeader => true
case _ => false
}
}
if (succeeded) (true, writtenStat.getVersion)
else (false, -1)
} catch {
case _: Exception => (false, -1)
}
}
}

View File

@ -21,7 +21,6 @@ import com.yammer.metrics.core.Metric
import kafka.log._ import kafka.log._
import kafka.server._ import kafka.server._
import kafka.utils._ import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException} import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException}
import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData} import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@ -36,7 +35,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong, anyString} import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong}
import org.mockito.Mockito._ import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock import org.mockito.invocation.InvocationOnMock
@ -44,7 +43,7 @@ import java.lang.{Long => JLong}
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.Optional import java.util.Optional
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore} import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore}
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.DelayedShareFetch import kafka.server.share.DelayedShareFetch
import org.apache.kafka.clients.ClientResponse import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.compress.Compression
@ -55,7 +54,6 @@ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, MetadataVersion, NodeToControllerChannelManager, RequestLocal} import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, MetadataVersion, NodeToControllerChannelManager, RequestLocal}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
@ -1503,7 +1501,7 @@ class PartitionTest extends AbstractPartitionTest {
val isrItem = alterPartitionManager.isrUpdates.head val isrItem = alterPartitionManager.isrUpdates.head
assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId).map(Int.box).asJava) assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId).map(Int.box).asJava)
isrItem.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState => isrItem.leaderAndIsr.isrWithBrokerEpoch.asScala.foreach { brokerState =>
// In ZK mode, the broker epochs in the leaderAndIsr should be -1. // the broker epochs in the leaderAndIsr should be -1.
assertEquals(-1, brokerState.brokerEpoch()) assertEquals(-1, brokerState.brokerEpoch())
} }
assertEquals(Set(brokerId), partition.partitionState.isr) assertEquals(Set(brokerId), partition.partitionState.isr)
@ -1682,8 +1680,6 @@ class PartitionTest extends AbstractPartitionTest {
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = Array("kraft")) @ValueSource(strings = Array("kraft"))
def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = { def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = {
val kraft = quorum == "kraft"
val log = logManager.getOrCreateLog(topicPartition, topicId = None) val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4) seedLogData(log, numRecords = 10, leaderEpoch = 4)
@ -1693,20 +1689,13 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId) val replicas = List(brokerId, remoteBrokerId)
val isr = Set(brokerId) val isr = Set(brokerId)
val metadataCache: MetadataCache = if (kraft) mock(classOf[KRaftMetadataCache]) else mock(classOf[ZkMetadataCache]) val metadataCache = mock(classOf[KRaftMetadataCache])
if (kraft) { addBrokerEpochToMockMetadataCache(metadataCache, replicas)
addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], replicas)
}
// Mark the remote broker as eligible or ineligible in the metadata cache of the leader. // Mark the remote broker as eligible or ineligible in the metadata cache of the leader.
// When using kraft, we can make the broker ineligible by fencing it. // When using kraft, we can make the broker ineligible by fencing it.
// In ZK mode, we must mark the broker as alive for it to be eligible.
def markRemoteReplicaEligible(eligible: Boolean): Unit = { def markRemoteReplicaEligible(eligible: Boolean): Unit = {
if (kraft) { when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(!eligible)
when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerFenced(remoteBrokerId)).thenReturn(!eligible)
} else {
when(metadataCache.hasAliveBroker(remoteBrokerId)).thenReturn(eligible)
}
} }
val partition = new Partition( val partition = new Partition(
@ -1845,7 +1834,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(isr, partition.partitionState.maximalIsr) assertEquals(isr, partition.partitionState.maximalIsr)
// Fetch to let the follower catch up to the log end offset, but using a wrong broker epoch. The expansion should fail. // Fetch to let the follower catch up to the log end offset, but using a wrong broker epoch. The expansion should fail.
addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], List(brokerId, remoteBrokerId2)) addBrokerEpochToMockMetadataCache(metadataCache, List(brokerId, remoteBrokerId2))
// Create a race case where the replica epoch get bumped right after the previous fetch succeeded. // Create a race case where the replica epoch get bumped right after the previous fetch succeeded.
val wrongReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) - 1 val wrongReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) - 1
when(metadataCache.getAliveBrokerEpoch(remoteBrokerId1)).thenReturn(Option(wrongReplicaEpoch), Option(defaultBrokerEpoch(remoteBrokerId1))) when(metadataCache.getAliveBrokerEpoch(remoteBrokerId1)).thenReturn(Option(wrongReplicaEpoch), Option(defaultBrokerEpoch(remoteBrokerId1)))
@ -1905,8 +1894,8 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId1) val replicas = List(brokerId, remoteBrokerId1)
val isr = Set(brokerId, remoteBrokerId1) val isr = Set(brokerId, remoteBrokerId1)
val metadataCache: MetadataCache = mock(classOf[KRaftMetadataCache]) val metadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], replicas) addBrokerEpochToMockMetadataCache(metadataCache, replicas)
val partition = new Partition( val partition = new Partition(
topicPartition, topicPartition,
@ -2698,71 +2687,6 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(alterPartitionManager.isrUpdates.size, 1) assertEquals(alterPartitionManager.isrUpdates.size, 1)
} }
@Test
def testZkIsrManagerAsyncCallback(): Unit = {
// We need a real scheduler here so that the ISR write lock works properly
val scheduler = new KafkaScheduler(1, true, "zk-isr-test")
scheduler.startup()
val kafkaZkClient = mock(classOf[KafkaZkClient])
doAnswer(_ => (true, 2))
.when(kafkaZkClient)
.conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any())
val zkIsrManager = AlterPartitionManager(scheduler, time, kafkaZkClient)
zkIsrManager.start()
val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DEFAULT,
interBrokerProtocolVersion = IBP_2_6_IV0, // shouldn't matter, but set this to a ZK isr version
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
zkIsrManager)
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val follower1 = brokerId + 1
val follower2 = brokerId + 2
val follower3 = brokerId + 3
val replicas = Seq(brokerId, follower1, follower2, follower3)
val isr = Seq(brokerId, follower1, follower2)
doNothing().when(delayedOperations).checkAndCompleteAll()
assertTrue(makeLeader(
partition = partition,
topicId = None,
controllerEpoch = controllerEpoch,
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
// Expand ISR
fetchFollower(partition, replicaId = follower3, fetchOffset = 10L)
// Try avoiding a race
TestUtils.waitUntilTrue(() => !partition.partitionState.isInflight, "Expected ISR state to be committed", 100)
partition.partitionState match {
case CommittedPartitionState(isr, _) => assertEquals(Set(brokerId, follower1, follower2, follower3), isr)
case _ => fail("Expected a committed ISR following Zk expansion")
}
scheduler.shutdown()
}
@Test @Test
def testUseCheckpointToInitializeHighWatermark(): Unit = { def testUseCheckpointToInitializeHighWatermark(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None) val log = logManager.getOrCreateLog(topicPartition, topicId = None)
@ -2936,7 +2860,7 @@ class PartitionTest extends AbstractPartitionTest {
val partition = new Partition( val partition = new Partition(
topicPartition, 1000, MetadataVersion.latestTesting, 0, () => defaultBrokerEpoch(0), topicPartition, 1000, MetadataVersion.latestTesting, 0, () => defaultBrokerEpoch(0),
Time.SYSTEM, mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]), Time.SYSTEM, mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]),
mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterPartitionManager])) mock(classOf[KRaftMetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterPartitionManager]))
val replicas = Seq(0, 1, 2, 3) val replicas = Seq(0, 1, 2, 3)
val followers = Seq(1, 2, 3) val followers = Seq(1, 2, 3)
@ -3200,7 +3124,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt) assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
val leaderLog = partition.localLogOrException val leaderLog = partition.localLogOrException
assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.asJava.flatMap(_.latestEntry)) assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.toJava.flatMap(_.latestEntry))
// Write to the log to increment the log end offset. // Write to the log to increment the log end offset.
leaderLog.appendAsLeader(MemoryRecords.withRecords(0L, Compression.NONE, 0, leaderLog.appendAsLeader(MemoryRecords.withRecords(0L, Compression.NONE, 0,
@ -3224,7 +3148,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(Set(leaderId), partition.partitionState.isr) assertEquals(Set(leaderId), partition.partitionState.isr)
assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt) assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.asJava.flatMap(_.latestEntry)) assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.toJava.flatMap(_.latestEntry))
} }
@Test @Test
@ -3773,8 +3697,8 @@ class PartitionTest extends AbstractPartitionTest {
fetchOffset, fetchOffset,
FetchRequest.INVALID_LOG_START_OFFSET, FetchRequest.INVALID_LOG_START_OFFSET,
maxBytes, maxBytes,
leaderEpoch.map(Int.box).asJava, leaderEpoch.map(Int.box).toJava,
lastFetchedEpoch.map(Int.box).asJava lastFetchedEpoch.map(Int.box).toJava
) )
partition.fetchRecords( partition.fetchRecords(
@ -3810,8 +3734,8 @@ class PartitionTest extends AbstractPartitionTest {
fetchOffset, fetchOffset,
logStartOffset, logStartOffset,
maxBytes, maxBytes,
leaderEpoch.map(Int.box).asJava, leaderEpoch.map(Int.box).toJava,
lastFetchedEpoch.map(Int.box).asJava lastFetchedEpoch.map(Int.box).toJava
) )
partition.fetchRecords( partition.fetchRecords(

View File

@ -19,11 +19,10 @@ package kafka.server
import java.util.Collections import java.util.Collections
import java.util.stream.{Stream => JStream} import java.util.stream.{Stream => JStream}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.ClientResponse import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.Uuid import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.{AuthenticationException, InvalidUpdateVersionException, OperationNotAttemptedException, UnknownServerException, UnsupportedVersionException} import org.apache.kafka.common.errors.{AuthenticationException, OperationNotAttemptedException, UnknownServerException, UnsupportedVersionException}
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState
import org.apache.kafka.common.message.{AlterPartitionRequestData, AlterPartitionResponseData} import org.apache.kafka.common.message.{AlterPartitionRequestData, AlterPartitionResponseData}
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
@ -43,7 +42,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource import org.junit.jupiter.params.provider.MethodSource
import org.mockito.ArgumentMatcher import org.mockito.ArgumentMatcher
import org.mockito.ArgumentMatchers.{any, anyString} import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, reset, times, verify} import org.mockito.Mockito.{mock, reset, times, verify}
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
@ -629,33 +628,6 @@ class AlterPartitionManagerTest {
.setErrorCode(error.code))) .setErrorCode(error.code)))
} }
@Test
def testZkBasic(): Unit = {
val scheduler = new MockScheduler(time)
scheduler.startup()
val kafkaZkClient = Mockito.mock(classOf[KafkaZkClient])
Mockito.doAnswer(_ => (true, 2))
.when(kafkaZkClient)
.conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any())
Mockito.doAnswer(_ => (false, 2))
.when(kafkaZkClient)
.conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(3), any())
val zkIsrManager = new ZkAlterPartitionManager(scheduler, time, kafkaZkClient)
zkIsrManager.start()
// Correct ZK version
val future1 = zkIsrManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 1), 0)
assertTrue(future1.isDone)
assertEquals(new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 2), future1.get)
// Wrong ZK version
val future2 = zkIsrManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 3), 0)
assertTrue(future2.isCompletedExceptionally)
assertFutureThrows(future2, classOf[InvalidUpdateVersionException])
}
private def partitionResponse( private def partitionResponse(
tp: TopicIdPartition = tp0, tp: TopicIdPartition = tp0,
error: Errors = Errors.NONE, error: Errors = Errors.NONE,