KAFKA-10825 ZooKeeper ISR manager (#9713)

ISR-related cleanup in ReplicaManager and Partition. Removes ISR change logic from ReplicaManager and adds a new ZkIsrManager class which adheres to a new AlterIsrManager trait. Unifies all of the ISR logic in Partition so we don't have separate code paths for ZK vs AlterIsr. Also removes PartitionStateStore
This commit is contained in:
David Arthur 2020-12-21 14:44:02 -05:00 committed by GitHub
parent 300909d9e6
commit d3f19e4bb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 426 additions and 278 deletions

View File

@ -28,7 +28,7 @@ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import kafka.zk.AdminZkClient
import kafka.zookeeper.ZooKeeperClientException
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.FetchResponseData
@ -51,40 +51,8 @@ trait IsrChangeListener {
def markFailed(): Unit
}
trait PartitionStateStore {
def fetchTopicConfig(): Properties
def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int]
def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int]
}
class ZkPartitionStateStore(topicPartition: TopicPartition,
zkClient: KafkaZkClient) extends PartitionStateStore {
override def fetchTopicConfig(): Properties = {
val adminZkClient = new AdminZkClient(zkClient)
adminZkClient.fetchEntityConfig(ConfigType.Topic, topicPartition.topic)
}
override def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = {
val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
newVersionOpt
}
override def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = {
val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr)
newVersionOpt
}
private def updateIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = {
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition,
leaderAndIsr, controllerEpoch)
if (updateSucceeded) {
Some(newVersion)
} else {
None
}
}
trait TopicConfigFetcher {
def fetch(): Properties
}
class DelayedOperations(topicPartition: TopicPartition,
@ -109,21 +77,22 @@ object Partition extends KafkaMetricsGroup {
val isrChangeListener = new IsrChangeListener {
override def markExpand(): Unit = {
replicaManager.recordIsrChange(topicPartition)
replicaManager.isrExpandRate.mark()
}
override def markShrink(): Unit = {
replicaManager.recordIsrChange(topicPartition)
replicaManager.isrShrinkRate.mark()
}
override def markFailed(): Unit = replicaManager.failedIsrUpdatesRate.mark()
}
val zkIsrBackingStore = new ZkPartitionStateStore(
topicPartition,
replicaManager.zkClient)
val configProvider = new TopicConfigFetcher {
override def fetch(): Properties = {
val adminZkClient = new AdminZkClient(replicaManager.zkClient)
adminZkClient.fetchEntityConfig(ConfigType.Topic, topicPartition.topic)
}
}
val delayedOperations = new DelayedOperations(
topicPartition,
@ -136,7 +105,7 @@ object Partition extends KafkaMetricsGroup {
interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion,
localBrokerId = replicaManager.config.brokerId,
time = time,
stateStore = zkIsrBackingStore,
topicConfigProvider = configProvider,
isrChangeListener = isrChangeListener,
delayedOperations = delayedOperations,
metadataCache = replicaManager.metadataCache,
@ -259,7 +228,7 @@ class Partition(val topicPartition: TopicPartition,
interBrokerProtocolVersion: ApiVersion,
localBrokerId: Int,
time: Time,
stateStore: PartitionStateStore,
topicConfigProvider: TopicConfigFetcher,
isrChangeListener: IsrChangeListener,
delayedOperations: DelayedOperations,
metadataCache: MetadataCache,
@ -285,8 +254,6 @@ class Partition(val topicPartition: TopicPartition,
@volatile private[cluster] var isrState: IsrState = CommittedIsr(Set.empty)
@volatile var assignmentState: AssignmentState = SimpleAssignmentState(Seq.empty)
private val useAlterIsr: Boolean = interBrokerProtocolVersion.isAlterIsrSupported
// Logs belonging to this partition. Majority of time it will be only one log, but if log directory
// is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy
// completes and a switch to new location is performed.
@ -375,7 +342,7 @@ class Partition(val topicPartition: TopicPartition,
// Visible for testing
private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
def fetchLogConfig: LogConfig = {
val props = stateStore.fetchTopicConfig()
val props = topicConfigProvider.fetch()
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
}
@ -1325,14 +1292,6 @@ class Partition(val topicPartition: TopicPartition,
}
private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
if (useAlterIsr) {
expandIsrWithAlterIsr(newInSyncReplica)
} else {
expandIsrWithZk(newInSyncReplica)
}
}
private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
// This is called from maybeExpandIsr which holds the ISR write lock
if (!isrState.isInflight) {
// When expanding the ISR, we can safely assume the new replica will make it into the ISR since this puts us in
@ -1343,26 +1302,7 @@ class Partition(val topicPartition: TopicPartition,
}
}
private def expandIsrWithZk(newInSyncReplica: Int): Unit = {
val newInSyncReplicaIds = isrState.isr + newInSyncReplica
info(s"Expanding ISR from ${isrState.isr.mkString(",")} to ${newInSyncReplicaIds.mkString(",")}")
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newInSyncReplicaIds.toList, zkVersion)
val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
if (zkVersionOpt.isDefined) {
isrChangeListener.markExpand()
}
maybeUpdateIsrAndVersionWithZk(newInSyncReplicaIds, zkVersionOpt)
}
private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
if (useAlterIsr) {
shrinkIsrWithAlterIsr(outOfSyncReplicas)
} else {
shrinkIsrWithZk(isrState.isr -- outOfSyncReplicas)
}
}
private def shrinkIsrWithAlterIsr(outOfSyncReplicas: Set[Int]): Unit = {
// This is called from maybeShrinkIsr which holds the ISR write lock
if (!isrState.isInflight) {
// When shrinking the ISR, we cannot assume that the update will succeed as this could erroneously advance the HW
@ -1374,47 +1314,29 @@ class Partition(val topicPartition: TopicPartition,
}
}
private def shrinkIsrWithZk(newIsr: Set[Int]): Unit = {
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion)
val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
if (zkVersionOpt.isDefined) {
isrChangeListener.markShrink()
}
maybeUpdateIsrAndVersionWithZk(newIsr, zkVersionOpt)
}
private def maybeUpdateIsrAndVersionWithZk(isr: Set[Int], zkVersionOpt: Option[Int]): Unit = {
zkVersionOpt match {
case Some(newVersion) =>
isrState = CommittedIsr(isr)
zkVersion = newVersion
info("ISR updated to [%s] and zkVersion updated to [%d]".format(isr.mkString(","), zkVersion))
case None =>
info(s"Cached zkVersion $zkVersion not equal to that in zookeeper, skip updating ISR")
isrChangeListener.markFailed()
}
}
private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
val isrToSend: Set[Int] = proposedIsrState match {
case PendingExpandIsr(isr, newInSyncReplicaId) => isr + newInSyncReplicaId
case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- outOfSyncReplicaIds
case state =>
throw new IllegalStateException(s"Invalid state $state for `AlterIsr` request for partition $topicPartition")
isrChangeListener.markFailed()
throw new IllegalStateException(s"Invalid state $state for ISR change for partition $topicPartition")
}
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion)
val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, handleAlterIsrResponse(proposedIsrState))
val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, handleAlterIsrResponse(proposedIsrState), controllerEpoch)
if (!alterIsrManager.enqueue(alterIsrItem)) {
val oldState = isrState
isrState = proposedIsrState
if (!alterIsrManager.submit(alterIsrItem)) {
// If the ISR manager did not accept our update, we need to revert back to previous state
isrState = oldState
isrChangeListener.markFailed()
throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request with state " +
s"$newLeaderAndIsr for partition $topicPartition")
throw new IllegalStateException(s"Failed to enqueue ISR change state $newLeaderAndIsr for partition $topicPartition")
}
isrState = proposedIsrState
debug(s"Sent `AlterIsr` request to change state to $newLeaderAndIsr after transition to $proposedIsrState")
debug(s"Enqueued ISR change to state $newLeaderAndIsr after transition to $proposedIsrState")
}
/**
@ -1438,27 +1360,27 @@ class Partition(val topicPartition: TopicPartition,
isrChangeListener.markFailed()
error match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
debug(s"Controller failed to update ISR to $proposedIsrState since it doesn't know about this topic or partition. Giving up.")
debug(s"Failed to update ISR to $proposedIsrState since it doesn't know about this topic or partition. Giving up.")
case Errors.FENCED_LEADER_EPOCH =>
debug(s"Controller failed to update ISR to $proposedIsrState since we sent an old leader epoch. Giving up.")
debug(s"Failed to update ISR to $proposedIsrState since we sent an old leader epoch. Giving up.")
case Errors.INVALID_UPDATE_VERSION =>
debug(s"Controller failed to update ISR to $proposedIsrState due to invalid zk version. Giving up.")
debug(s"Failed to update ISR to $proposedIsrState due to invalid zk version. Giving up.")
case _ =>
warn(s"Controller failed to update ISR to $proposedIsrState due to unexpected $error. Retrying.")
warn(s"Failed to update ISR to $proposedIsrState due to unexpected $error. Retrying.")
sendAlterIsrRequest(proposedIsrState)
}
case Right(leaderAndIsr: LeaderAndIsr) =>
// Success from controller, still need to check a few things
if (leaderAndIsr.leaderEpoch != leaderEpoch) {
debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we have a stale leader epoch $leaderEpoch.")
debug(s"Ignoring new ISR ${leaderAndIsr} since we have a stale leader epoch $leaderEpoch.")
isrChangeListener.markFailed()
} else if (leaderAndIsr.zkVersion <= zkVersion) {
debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we have a newer version $zkVersion.")
debug(s"Ignoring new ISR ${leaderAndIsr} since we have a newer version $zkVersion.")
isrChangeListener.markFailed()
} else {
isrState = CommittedIsr(leaderAndIsr.isr.toSet)
zkVersion = leaderAndIsr.zkVersion
info(s"ISR updated from AlterIsr to ${isrState.isr.mkString(",")} and version updated to [$zkVersion]")
info(s"ISR updated to ${isrState.isr.mkString(",")} and version updated to [$zkVersion]")
proposedIsrState match {
case PendingExpandIsr(_, _) => isrChangeListener.markExpand()
case PendingShrinkIsr(_, _) => isrChangeListener.markShrink()

View File

@ -23,6 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import kafka.api.LeaderAndIsr
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.{Logging, Scheduler}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData}
@ -35,24 +36,51 @@ import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
/**
* Handles the sending of AlterIsr requests to the controller. Updating the ISR is an asynchronous operation,
* so partitions will learn about updates through LeaderAndIsr messages sent from the controller
* Handles updating the ISR by sending AlterIsr requests to the controller (as of 2.7) or by updating ZK directly
* (prior to 2.7). Updating the ISR is an asynchronous operation, so partitions will learn about the result of their
* request through a callback.
*
* Note that ISR state changes can still be initiated by the controller and sent to the partitions via LeaderAndIsr
* requests.
*/
trait AlterIsrManager {
def start(): Unit
def enqueue(alterIsrItem: AlterIsrItem): Boolean
def submit(alterIsrItem: AlterIsrItem): Boolean
def clearPending(topicPartition: TopicPartition): Unit
}
case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, callback: Either[Errors, LeaderAndIsr] => Unit)
case class AlterIsrItem(topicPartition: TopicPartition,
leaderAndIsr: LeaderAndIsr,
callback: Either[Errors, LeaderAndIsr] => Unit,
controllerEpoch: Int) // controllerEpoch needed for Zk impl
class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChannelManager,
val scheduler: Scheduler,
val time: Time,
val brokerId: Int,
val brokerEpochSupplier: () => Long) extends AlterIsrManager with Logging with KafkaMetricsGroup {
object AlterIsrManager {
/**
* Factory to AlterIsr based implementation, used when IBP >= 2.7-IV2
*/
def apply(controllerChannelManager: BrokerToControllerChannelManager,
scheduler: Scheduler,
time: Time,
brokerId: Int,
brokerEpochSupplier: () => Long): AlterIsrManager = {
new DefaultAlterIsrManager(controllerChannelManager, scheduler, time, brokerId, brokerEpochSupplier)
}
/**
* Factory for ZK based implementation, used when IBP < 2.7-IV2
*/
def apply(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient): AlterIsrManager = {
new ZkIsrManager(scheduler, time, zkClient)
}
}
class DefaultAlterIsrManager(val controllerChannelManager: BrokerToControllerChannelManager,
val scheduler: Scheduler,
val time: Time,
val brokerId: Int,
val brokerEpochSupplier: () => Long) extends AlterIsrManager with Logging with KafkaMetricsGroup {
// Used to allow only one pending ISR update per partition
private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
@ -66,7 +94,7 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
}
override def enqueue(alterIsrItem: AlterIsrItem): Boolean = {
override def submit(alterIsrItem: AlterIsrItem): Boolean = {
unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
}

View File

@ -444,8 +444,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}
protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
val alterIsrManager = new AlterIsrManagerImpl(alterIsrChannelManager, kafkaScheduler,
time, config.brokerId, () => kafkaController.brokerEpoch)
val alterIsrManager: AlterIsrManager = if (config.interBrokerProtocolVersion.isAlterIsrSupported) {
AlterIsrManager(alterIsrChannelManager, kafkaScheduler,
time, config.brokerId, () => kafkaController.brokerEpoch)
} else {
AlterIsrManager(kafkaScheduler, time, zkClient)
}
new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager)
}

View File

@ -19,7 +19,7 @@ package kafka.server
import java.io.File
import java.util.Optional
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.atomic.{AtomicBoolean}
import java.util.concurrent.locks.Lock
import com.yammer.metrics.core.Meter
@ -166,26 +166,8 @@ object HostedPartition {
final object Offline extends HostedPartition
}
case class IsrChangePropagationConfig(
// How often to check for ISR
checkIntervalMs: Long,
// Maximum time that an ISR change may be delayed before sending the notification
maxDelayMs: Long,
// Maximum time to await additional changes before sending the notification
lingerMs: Long
)
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
// This field is mutable to allow overriding change notification behavior in test cases
@volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = IsrChangePropagationConfig(
checkIntervalMs = 2500,
lingerMs = 5000,
maxDelayMs = 60000,
)
}
class ReplicaManager(val config: KafkaConfig,
@ -251,11 +233,6 @@ class ReplicaManager(val config: KafkaConfig,
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
private val isrChangeNotificationConfig = ReplicaManager.DefaultIsrPropagationConfig
private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
private val lastIsrChangeMs = new AtomicLong(time.milliseconds())
private val lastIsrPropagationMs = new AtomicLong(time.milliseconds())
private var logDirFailureHandler: LogDirFailureHandler = null
private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
@ -294,34 +271,6 @@ class ReplicaManager(val config: KafkaConfig,
scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks _, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
}
def recordIsrChange(topicPartition: TopicPartition): Unit = {
if (!config.interBrokerProtocolVersion.isAlterIsrSupported) {
isrChangeSet synchronized {
isrChangeSet += topicPartition
lastIsrChangeMs.set(time.milliseconds())
}
}
}
/**
* This function periodically runs to see if ISR needs to be propagated. It propagates ISR when:
* 1. There is ISR change not propagated yet.
* 2. There is no ISR Change in the last five seconds, or it has been more than 60 seconds since the last ISR propagation.
* This allows an occasional ISR change to be propagated within a few seconds, and avoids overwhelming controller and
* other brokers when large amount of ISR change occurs.
*/
def maybePropagateIsrChanges(): Unit = {
val now = time.milliseconds()
isrChangeSet synchronized {
if (isrChangeSet.nonEmpty &&
(lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||
lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs < now)) {
zkClient.propagateIsrChanges(isrChangeSet)
isrChangeSet.clear()
lastIsrPropagationMs.set(now)
}
}
}
// When ReplicaAlterDirThread finishes replacing a current replica with a future replica, it will
// remove the partition from the partition state map. But it will not close itself even if the
// partition state map is empty. Thus we need to call shutdownIdleReplicaAlterDirThread() periodically
@ -343,14 +292,8 @@ class ReplicaManager(val config: KafkaConfig,
// start ISR expiration thread
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
// If using AlterIsr, we don't need the znode ISR propagation
if (!config.interBrokerProtocolVersion.isAlterIsrSupported) {
scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _,
period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS)
} else {
alterIsrManager.start()
}
scheduler.schedule("shutdown-idle-replica-alter-log-dirs-thread", shutdownIdleReplicaAlterLogDirsThread _, period = 10000L, unit = TimeUnit.MILLISECONDS)
alterIsrManager.start()
// If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest V0 which does not include isNew field.
// In this case, the broker receiving the request cannot determine whether it is safe to create a partition if a log directory has failed.

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.{Logging, ReplicationUtils, Scheduler}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.Time
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
/**
* @param checkIntervalMs How often to check for ISR
* @param maxDelayMs Maximum time that an ISR change may be delayed before sending the notification
* @param lingerMs Maximum time to await additional changes before sending the notification
*/
case class IsrChangePropagationConfig(checkIntervalMs: Long, maxDelayMs: Long, lingerMs: Long)
object ZkIsrManager {
// This field is mutable to allow overriding change notification behavior in test cases
@volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = IsrChangePropagationConfig(
checkIntervalMs = 2500,
lingerMs = 5000,
maxDelayMs = 60000,
)
}
class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) extends AlterIsrManager with Logging {
private val isrChangeNotificationConfig = ZkIsrManager.DefaultIsrPropagationConfig
// Visible for testing
private[server] val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
private val lastIsrChangeMs = new AtomicLong(time.milliseconds())
private val lastIsrPropagationMs = new AtomicLong(time.milliseconds())
override def start(): Unit = {
scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _,
period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS)
}
override def clearPending(topicPartition: TopicPartition): Unit = {
// Since we always immediately process ZK updates and never actually enqueue anything, there is nothing to
// clear here so this is a no-op. Even if there are changes that have not been propagated, the write to ZK
// has already happened, so we may as well send the notification to the controller.
}
override def submit(alterIsrItem: AlterIsrItem): Boolean = {
debug(s"Writing new ISR ${alterIsrItem.leaderAndIsr.isr} to ZooKeeper with version " +
s"${alterIsrItem.leaderAndIsr.zkVersion} for partition ${alterIsrItem.topicPartition}")
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, alterIsrItem.topicPartition,
alterIsrItem.leaderAndIsr, alterIsrItem.controllerEpoch)
if (updateSucceeded) {
// Track which partitions need to be propagated to the controller
isrChangeSet synchronized {
isrChangeSet += alterIsrItem.topicPartition
lastIsrChangeMs.set(time.milliseconds())
}
// We rely on Partition#isrState being properly set to the pending ISR at this point since we are synchronously
// applying the callback
alterIsrItem.callback.apply(Right(alterIsrItem.leaderAndIsr.withZkVersion(newVersion)))
} else {
alterIsrItem.callback.apply(Left(Errors.INVALID_UPDATE_VERSION))
}
// Return true since we unconditionally accept the AlterIsrItem. The result of the operation is indicated by the
// callback, not the return value of this method
true
}
/**
* This function periodically runs to see if ISR needs to be propagated. It propagates ISR when:
* 1. There is ISR change not propagated yet.
* 2. There is no ISR Change in the last five seconds, or it has been more than 60 seconds since the last ISR propagation.
* This allows an occasional ISR change to be propagated within a few seconds, and avoids overwhelming controller and
* other brokers when large amount of ISR change occurs.
*/
private[server] def maybePropagateIsrChanges(): Unit = {
val now = time.milliseconds()
isrChangeSet synchronized {
if (isrChangeSet.nonEmpty &&
(lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||
lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs < now)) {
zkClient.propagateIsrChanges(isrChangeSet)
isrChangeSet.clear()
lastIsrPropagationMs.set(now)
}
}
}
}

View File

@ -19,10 +19,9 @@ package kafka.admin
import java.io.Closeable
import java.util.{Collections, HashMap, List}
import kafka.admin.ReassignPartitionsCommand._
import kafka.api.KAFKA_2_7_IV1
import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ReplicaManager}
import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ZkIsrManager}
import kafka.utils.Implicits._
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
@ -88,7 +87,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
// Override change notification settings so that test is not delayed by ISR
// change notification delay
ReplicaManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig(
ZkIsrManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig(
checkIntervalMs = 500,
lingerMs = 100,
maxDelayMs = 500

View File

@ -43,7 +43,7 @@ class AbstractPartitionTest {
var alterIsrManager: MockAlterIsrManager = _
var isrChangeListener: MockIsrChangeListener = _
var logConfig: LogConfig = _
val stateStore: PartitionStateStore = mock(classOf[PartitionStateStore])
var topicConfigProvider: TopicConfigFetcher = _
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
@ -55,6 +55,7 @@ class AbstractPartitionTest {
val logProps = createLogProperties(Map.empty)
logConfig = LogConfig(logProps)
topicConfigProvider = TestUtils.createTopicConfigProvider(logProps)
tmpDir = TestUtils.tempDir()
logDir1 = TestUtils.randomPartitionLogDir(tmpDir)
@ -70,14 +71,13 @@ class AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
topicConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
logManager,
alterIsrManager)
when(stateStore.fetchTopicConfig()).thenReturn(createLogProperties(Map.empty))
when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition)))
.thenReturn(None)
}

View File

@ -21,7 +21,7 @@ import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent._
import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.api.ApiVersion
import kafka.log._
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
@ -248,7 +248,7 @@ class PartitionLockTest extends Logging {
val leaderEpoch = 1
val brokerId = 0
val topicPartition = new TopicPartition("test-topic", 0)
val stateStore: PartitionStateStore = mock(classOf[PartitionStateStore])
val topicConfigProvider = TestUtils.createTopicConfigProvider(createLogProperties(Map.empty))
val isrChangeListener: IsrChangeListener = mock(classOf[IsrChangeListener])
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
@ -261,7 +261,7 @@ class PartitionLockTest extends Logging {
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
mockTime,
stateStore,
topicConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
@ -282,14 +282,9 @@ class PartitionLockTest extends Logging {
new SlowLog(log, mockTime, appendSemaphore)
}
}
when(stateStore.fetchTopicConfig()).thenReturn(createLogProperties(Map.empty))
when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition)))
.thenReturn(None)
when(stateStore.shrinkIsr(ArgumentMatchers.anyInt, ArgumentMatchers.any[LeaderAndIsr]))
.thenReturn(Some(2))
when(stateStore.expandIsr(ArgumentMatchers.anyInt, ArgumentMatchers.any[LeaderAndIsr]))
.thenReturn(Some(2))
when(alterIsrManager.enqueue(ArgumentMatchers.any[AlterIsrItem]))
when(alterIsrManager.submit(ArgumentMatchers.any[AlterIsrItem]))
.thenReturn(true)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)

View File

@ -20,13 +20,14 @@ import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}
import com.yammer.metrics.core.Metric
import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.api.{ApiVersion, KAFKA_2_6_IV0}
import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{Defaults => _, _}
import kafka.metrics.KafkaYammerMetrics
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors.{ApiException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@ -40,6 +41,7 @@ import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.junit.Assert._
import org.junit.Test
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.scalatest.Assertions.assertThrows
@ -227,7 +229,7 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
topicConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
@ -606,16 +608,15 @@ class PartitionTest extends AbstractPartitionTest {
}
}
when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch,
List(leader, follower2, follower1), 1)))
.thenReturn(Some(2))
updateFollowerFetchState(follower1, LogOffsetMetadata(0))
updateFollowerFetchState(follower1, LogOffsetMetadata(2))
updateFollowerFetchState(follower2, LogOffsetMetadata(0))
updateFollowerFetchState(follower2, LogOffsetMetadata(2))
// Simulate successful ISR update
alterIsrManager.completeIsrUpdate(2)
// At this point, the leader has gotten 5 writes, but followers have only fetched two
assertEquals(2, partition.localLogOrException.highWatermark)
@ -699,14 +700,13 @@ class PartitionTest extends AbstractPartitionTest {
case Left(e: ApiException) => fail(s"Should have seen OffsetNotAvailableException, saw $e")
}
when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch + 2,
List(leader, follower2, follower1), 5)))
.thenReturn(Some(2))
// Next fetch from replicas, HW is moved up to 5 (ahead of the LEO)
updateFollowerFetchState(follower1, LogOffsetMetadata(5))
updateFollowerFetchState(follower2, LogOffsetMetadata(5))
// Simulate successful ISR update
alterIsrManager.completeIsrUpdate(6)
// Error goes away
fetchOffsetsForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
case Right(Some(offsetAndTimestamp)) => assertEquals(5, offsetAndTimestamp.offset)
@ -1006,7 +1006,7 @@ class PartitionTest extends AbstractPartitionTest {
// Expansion does not affect the ISR
assertEquals("ISR", Set[Integer](leader, follower2), partition.isrState.isr)
assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.isrState.maximalIsr)
assertEquals("AlterIsr", alterIsrManager.isrUpdates.dequeue().leaderAndIsr.isr.toSet,
assertEquals("AlterIsr", alterIsrManager.isrUpdates.head.leaderAndIsr.isr.toSet,
Set(leader, follower1, follower2))
}
@ -1165,7 +1165,7 @@ class PartitionTest extends AbstractPartitionTest {
leaderEndOffset = 6L)
assertEquals(alterIsrManager.isrUpdates.size, 1)
val isrItem = alterIsrManager.isrUpdates.dequeue()
val isrItem = alterIsrManager.isrUpdates.head
assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId))
assertEquals(Set(brokerId), partition.isrState.isr)
assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.maximalIsr)
@ -1173,7 +1173,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(0L, remoteReplica.logStartOffset)
// Complete the ISR expansion
isrItem.callback.apply(Right(new LeaderAndIsr(brokerId, leaderEpoch, List(brokerId, remoteBrokerId), 2)))
alterIsrManager.completeIsrUpdate(2)
assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.isr)
assertEquals(isrChangeListener.expands.get, 1)
@ -1224,8 +1224,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(0L, remoteReplica.logStartOffset)
// Simulate failure callback
val alterIsrItem = alterIsrManager.isrUpdates.dequeue()
alterIsrItem.callback.apply(Left(Errors.INVALID_UPDATE_VERSION))
alterIsrManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
// Still no ISR change
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
@ -1281,7 +1280,7 @@ class PartitionTest extends AbstractPartitionTest {
// Shrink the ISR
partition.maybeShrinkIsr()
assertEquals(alterIsrManager.isrUpdates.size, 1)
assertEquals(alterIsrManager.isrUpdates.dequeue().leaderAndIsr.isr, List(brokerId))
assertEquals(alterIsrManager.isrUpdates.head.leaderAndIsr.isr, List(brokerId))
assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.isr)
assertEquals(Set(brokerId, remoteBrokerId), partition.isrState.maximalIsr)
assertEquals(0L, partition.localLogOrException.highWatermark)
@ -1450,8 +1449,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(0L, partition.localLogOrException.highWatermark)
// Simulate failure callback
val alterIsrItem = alterIsrManager.isrUpdates.dequeue()
alterIsrItem.callback.apply(Left(Errors.INVALID_UPDATE_VERSION))
alterIsrManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
// Ensure ISR hasn't changed
assertEquals(partition.isrState.getClass, classOf[PendingShrinkIsr])
@ -1534,7 +1532,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(0L, remoteReplica.logStartOffset)
// Failure
alterIsrManager.isrUpdates.dequeue().callback(Left(error))
alterIsrManager.failIsrUpdate(error)
callback(brokerId, remoteBrokerId, partition)
}
@ -1581,6 +1579,74 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(alterIsrManager.isrUpdates.size, 1)
}
@Test
def testZkIsrManagerAsyncCallback(): Unit = {
// We need a real scheduler here so that the ISR write lock works properly
val scheduler = new KafkaScheduler(1, "zk-isr-test")
scheduler.startup()
val kafkaZkClient = mock(classOf[KafkaZkClient])
doAnswer(_ => (true, 2))
.when(kafkaZkClient)
.conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any())
val zkIsrManager = AlterIsrManager(scheduler, time, kafkaZkClient)
zkIsrManager.start()
val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = KAFKA_2_6_IV0, // shouldn't matter, but set this to a ZK isr version
localBrokerId = brokerId,
time,
topicConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
logManager,
zkIsrManager)
val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val follower1 = brokerId + 1
val follower2 = brokerId + 2
val follower3 = brokerId + 3
val replicas = List[Integer](brokerId, follower1, follower2, follower3).asJava
val isr = List[Integer](brokerId, follower1, follower2).asJava
doNothing().when(delayedOperations).checkAndCompleteAll()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed",
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(true),
offsetCheckpoints))
assertEquals(Set(brokerId, follower1, follower2), partition.isrState.isr)
assertEquals(0L, partition.localLogOrException.highWatermark)
// Expand ISR
partition.expandIsr(follower3)
// Try avoiding a race
TestUtils.waitUntilTrue(() => !partition.isrState.isInflight, "Expected ISR state to be committed", 100)
partition.isrState match {
case committed: CommittedIsr => assertEquals(Set(brokerId, follower1, follower2, follower3), committed.isr)
case _ => fail("Expected a committed ISR following Zk expansion")
}
scheduler.shutdown()
}
@Test
def testUseCheckpointToInitializeHighWatermark(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
@ -1653,7 +1719,7 @@ class PartitionTest extends AbstractPartitionTest {
val topicPartition = new TopicPartition("test", 1)
val partition = new Partition(
topicPartition, 1000, ApiVersion.latestVersion, 0,
new SystemTime(), mock(classOf[PartitionStateStore]), mock(classOf[IsrChangeListener]), mock(classOf[DelayedOperations]),
new SystemTime(), topicConfigProvider, mock(classOf[IsrChangeListener]), mock(classOf[DelayedOperations]),
mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterIsrManager]))
val replicas = Seq(0, 1, 2, 3)
@ -1690,12 +1756,13 @@ class PartitionTest extends AbstractPartitionTest {
@Test
def testLogConfigNotDirty(): Unit = {
val spyLogManager = spy(logManager)
val spyConfigProvider = spy(topicConfigProvider)
val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
spyConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
@ -1711,7 +1778,7 @@ class PartitionTest extends AbstractPartitionTest {
ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation
// We should get config from ZK only once
verify(stateStore).fetchTopicConfig()
verify(spyConfigProvider, times(1)).fetch()
}
/**
@ -1720,6 +1787,7 @@ class PartitionTest extends AbstractPartitionTest {
*/
@Test
def testLogConfigDirtyAsTopicUpdated(): Unit = {
val spyConfigProvider = spy(topicConfigProvider)
val spyLogManager = spy(logManager)
doAnswer((invocation: InvocationOnMock) => {
logManager.initializingLog(topicPartition)
@ -1731,7 +1799,7 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
spyConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
@ -1748,7 +1816,7 @@ class PartitionTest extends AbstractPartitionTest {
// We should get config from ZK twice, once before log is created, and second time once
// we find log config is dirty and refresh it.
verify(stateStore, times(2)).fetchTopicConfig()
verify(spyConfigProvider, times(2)).fetch()
}
/**
@ -1757,6 +1825,7 @@ class PartitionTest extends AbstractPartitionTest {
*/
@Test
def testLogConfigDirtyAsBrokerUpdated(): Unit = {
val spyConfigProvider = spy(topicConfigProvider)
val spyLogManager = spy(logManager)
doAnswer((invocation: InvocationOnMock) => {
logManager.initializingLog(topicPartition)
@ -1768,7 +1837,7 @@ class PartitionTest extends AbstractPartitionTest {
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
spyConfigProvider,
isrChangeListener,
delayedOperations,
metadataCache,
@ -1785,7 +1854,7 @@ class PartitionTest extends AbstractPartitionTest {
// We should get config from ZK twice, once before log is created, and second time once
// we find log config is dirty and refresh it.
verify(stateStore, times(2)).fetchTopicConfig()
verify(spyConfigProvider, times(2)).fetch()
}
private def seedLogData(log: Log, numRecords: Int, leaderEpoch: Int): Unit = {

View File

@ -19,10 +19,10 @@ package unit.kafka.server
import java.util.Collections
import java.util.concurrent.atomic.AtomicInteger
import kafka.api.LeaderAndIsr
import kafka.server.{AlterIsrItem, AlterIsrManager, AlterIsrManagerImpl, BrokerToControllerChannelManager, ControllerRequestCompletionHandler}
import kafka.server.{AlterIsrItem, AlterIsrManager, BrokerToControllerChannelManager, ControllerRequestCompletionHandler, DefaultAlterIsrManager, ZkIsrManager}
import kafka.utils.{MockScheduler, MockTime}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.AlterIsrResponseData
@ -32,6 +32,8 @@ import org.apache.kafka.common.requests.{AbstractRequest, AlterIsrRequest, Alter
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{Before, Test}
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.{ArgumentMatchers, Mockito}
class AlterIsrManagerTest {
@ -59,9 +61,9 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
val alterIsrManager = new AlterIsrManagerImpl(brokerToController, scheduler, time, brokerId, () => 2)
val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2)
alterIsrManager.start()
alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}))
alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
time.sleep(50)
scheduler.tick()
@ -75,12 +77,12 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
val alterIsrManager = new AlterIsrManagerImpl(brokerToController, scheduler, time, brokerId, () => 2)
val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2)
alterIsrManager.start()
// Only send one ISR update for a given topic+partition
assertTrue(alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {})))
assertFalse(alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {})))
assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0)))
assertFalse(alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2), 10), _ => {}, 0)))
time.sleep(50)
scheduler.tick()
@ -99,12 +101,12 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
val alterIsrManager = new AlterIsrManagerImpl(brokerToController, scheduler, time, brokerId, () => 2)
val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2)
alterIsrManager.start()
for (i <- 0 to 9) {
alterIsrManager.enqueue(AlterIsrItem(new TopicPartition(topic, i),
new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}))
alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, i),
new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
time.sleep(1)
}
@ -112,8 +114,8 @@ class AlterIsrManagerTest {
scheduler.tick()
// This should not be included in the batch
alterIsrManager.enqueue(AlterIsrItem(new TopicPartition(topic, 10),
new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}))
alterIsrManager.submit(AlterIsrItem(new TopicPartition(topic, 10),
new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
EasyMock.verify(brokerToController)
@ -124,26 +126,26 @@ class AlterIsrManagerTest {
@Test
def testAuthorizationFailed(): Unit = {
val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => { }))
val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => { }, 0))
val manager = testTopLevelError(isrs, Errors.CLUSTER_AUTHORIZATION_FAILED)
// On authz error, we log the exception and keep retrying
assertFalse(manager.enqueue(AlterIsrItem(tp0, null, _ => { })))
assertFalse(manager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
}
@Test
def testStaleBrokerEpoch(): Unit = {
val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => { }))
val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => { }, 0))
val manager = testTopLevelError(isrs, Errors.STALE_BROKER_EPOCH)
// On stale broker epoch, we want to retry, so we don't clear items from the pending map
assertFalse(manager.enqueue(AlterIsrItem(tp0, null, _ => { })))
assertFalse(manager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
}
@Test
def testOtherErrors(): Unit = {
val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => { }))
val isrs = Seq(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => { }, 0))
val manager = testTopLevelError(isrs, Errors.UNKNOWN_SERVER_ERROR)
// On other unexpected errors, we also want to retry
assertFalse(manager.enqueue(AlterIsrItem(tp0, null, _ => { })))
assertFalse(manager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
}
def testTopLevelError(isrs: Seq[AlterIsrItem], error: Errors): AlterIsrManager = {
@ -153,9 +155,9 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
val alterIsrManager = new AlterIsrManagerImpl(brokerToController, scheduler, time, brokerId, () => 2)
val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2)
alterIsrManager.start()
isrs.foreach(alterIsrManager.enqueue)
isrs.foreach(alterIsrManager.submit)
time.sleep(100)
scheduler.tick()
@ -175,7 +177,7 @@ class AlterIsrManagerTest {
errors.foreach(error => {
val alterIsrManager = testPartitionError(tp0, error)
// Any partition-level error should clear the item from the pending queue allowing for future updates
assertTrue(alterIsrManager.enqueue(AlterIsrItem(tp0, null, _ => { })))
assertTrue(alterIsrManager.submit(AlterIsrItem(tp0, null, _ => { }, 0)))
})
}
@ -186,7 +188,7 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
val alterIsrManager = new AlterIsrManagerImpl(brokerToController, scheduler, time, brokerId, () => 2)
val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2)
alterIsrManager.start()
var capturedError: Option[Errors] = None
@ -197,7 +199,7 @@ class AlterIsrManagerTest {
}
}
alterIsrManager.enqueue(AlterIsrItem(tp, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback))
alterIsrManager.submit(AlterIsrItem(tp, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0))
time.sleep(100)
scheduler.tick()
@ -228,16 +230,16 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
val alterIsrManager = new AlterIsrManagerImpl(brokerToController, scheduler, time, brokerId, () => 2)
val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2)
alterIsrManager.start()
alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}))
alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
time.sleep(100)
scheduler.tick() // Triggers a request
// Enqueue more updates
alterIsrManager.enqueue(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}))
alterIsrManager.enqueue(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}))
alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), _ => {}, 0))
time.sleep(100)
scheduler.tick() // Trigger the schedule again, but no request this time
@ -267,7 +269,7 @@ class AlterIsrManagerTest {
EasyMock.replay(brokerToController)
val scheduler = new MockScheduler(time)
val alterIsrManager = new AlterIsrManagerImpl(brokerToController, scheduler, time, brokerId, () => 2)
val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2)
alterIsrManager.start()
val count = new AtomicInteger(0)
@ -275,9 +277,9 @@ class AlterIsrManagerTest {
count.incrementAndGet()
return
}
alterIsrManager.enqueue(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback))
alterIsrManager.enqueue(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback))
alterIsrManager.enqueue(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback))
alterIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0))
alterIsrManager.submit(AlterIsrItem(tp1, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0))
alterIsrManager.submit(AlterIsrItem(tp2, new LeaderAndIsr(1, 1, List(1,2,3), 10), callback, 0))
time.sleep(100)
@ -300,4 +302,33 @@ class AlterIsrManagerTest {
assertEquals("Expected all callbacks to run", count.get, 3)
}
@Test
def testZkBasic(): Unit = {
val scheduler = new MockScheduler(time)
scheduler.startup()
val kafkaZkClient = Mockito.mock(classOf[KafkaZkClient])
Mockito.doAnswer(_ => (true, 2))
.when(kafkaZkClient)
.conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any())
Mockito.doAnswer(_ => (false, 2))
.when(kafkaZkClient)
.conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(3), any())
val zkIsrManager = new ZkIsrManager(scheduler, time, kafkaZkClient)
zkIsrManager.start()
def expectMatch(expect: Either[Errors, LeaderAndIsr])(result: Either[Errors, LeaderAndIsr]): Unit = {
assertEquals(expect, result)
}
// Correct ZK version
assertTrue(zkIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 1),
expectMatch(Right(new LeaderAndIsr(1, 1, List(1,2,3), 2))), 0)))
// Wrong ZK version
assertTrue(zkIsrManager.submit(AlterIsrItem(tp0, new LeaderAndIsr(1, 1, List(1,2,3), 3),
expectMatch(Left(Errors.INVALID_UPDATE_VERSION)), 0)))
}
}

View File

@ -17,13 +17,14 @@
package kafka.server
import java.util.Properties
import kafka.api.ApiVersion
import java.util.Properties
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.zookeeper.client.ZKClientConfig
import org.junit.Test
import org.junit.Assert.assertEquals
import org.junit.Assert.{assertEquals, fail}
import org.scalatest.Assertions.intercept
class KafkaServerTest extends ZooKeeperTestHarness {
@ -102,6 +103,32 @@ class KafkaServerTest extends ZooKeeperTestHarness {
assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.get.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
}
@Test
def testZkIsrManager(): Unit = {
val props = TestUtils.createBrokerConfigs(1, zkConnect).head
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "2.7-IV1")
val server = TestUtils.createServer(KafkaConfig.fromProps(props))
server.replicaManager.alterIsrManager match {
case _: ZkIsrManager =>
case _ => fail("Should use ZK for ISR manager in versions before 2.7-IV2")
}
server.shutdown()
}
@Test
def testAlterIsrManager(): Unit = {
val props = TestUtils.createBrokerConfigs(1, zkConnect).head
props.put(KafkaConfig.InterBrokerProtocolVersionProp, ApiVersion.latestVersion.toString)
val server = TestUtils.createServer(KafkaConfig.fromProps(props))
server.replicaManager.alterIsrManager match {
case _: DefaultAlterIsrManager =>
case _ => fail("Should use AlterIsr for ISR manager in versions after 2.7-IV2")
}
server.shutdown()
}
def createServer(nodeId: Int, hostName: String, port: Int): KafkaServer = {
val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$hostName:$port")

View File

@ -64,6 +64,7 @@ class ReplicaManagerTest {
val topic = "test-topic"
val time = new MockTime
val scheduler = new MockScheduler(time)
val metrics = new Metrics
var kafkaZkClient: KafkaZkClient = _
var alterIsrManager: AlterIsrManager = _
@ -1708,9 +1709,11 @@ class ReplicaManagerTest {
result
}
private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, aliveBrokerIds: Seq[Int] = Seq(0, 1)): ReplicaManager = {
private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer, aliveBrokerIds: Seq[Int] = Seq(0, 1),
propsModifier: Properties => Unit = _ => {}): ReplicaManager = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
propsModifier.apply(props)
val config = KafkaConfig.fromProps(props)
val logProps = new Properties()
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
@ -1733,7 +1736,7 @@ class ReplicaManagerTest {
val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false)
new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
new ReplicaManager(config, metrics, time, kafkaZkClient, scheduler, mockLogMgr,
new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory, Option(this.getClass.getName), alterIsrManager)

View File

@ -23,13 +23,12 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Arrays, Collections, Properties}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
import kafka.cluster.{Broker, EndPoint, IsrChangeListener, TopicConfigFetcher}
import kafka.log._
import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer}
import kafka.server._
@ -50,6 +49,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, UnknownTopicOrPart
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record._
import org.apache.kafka.common.resource.ResourcePattern
@ -1067,17 +1067,40 @@ object TestUtils extends Logging {
class MockAlterIsrManager extends AlterIsrManager {
val isrUpdates: mutable.Queue[AlterIsrItem] = new mutable.Queue[AlterIsrItem]()
val inFlight: AtomicBoolean = new AtomicBoolean(false)
override def enqueue(alterIsrItem: AlterIsrItem): Boolean = {
isrUpdates += alterIsrItem
true
override def submit(alterIsrItem: AlterIsrItem): Boolean = {
if (inFlight.compareAndSet(false, true)) {
isrUpdates += alterIsrItem
true
} else {
false
}
}
override def clearPending(topicPartition: TopicPartition): Unit = {
isrUpdates.clear()
inFlight.set(false);
}
override def start(): Unit = { }
def completeIsrUpdate(newZkVersion: Int): Unit = {
if (inFlight.compareAndSet(true, false)) {
val item = isrUpdates.head
item.callback.apply(Right(item.leaderAndIsr.withZkVersion(newZkVersion)))
} else {
fail("Expected an in-flight ISR update, but there was none")
}
}
def failIsrUpdate(error: Errors): Unit = {
if (inFlight.compareAndSet(true, false)) {
val item = isrUpdates.dequeue()
item.callback.apply(Left(error))
} else {
fail("Expected an in-flight ISR update, but there was none")
}
}
}
def createAlterIsrManager(): MockAlterIsrManager = {
@ -1106,6 +1129,14 @@ object TestUtils extends Logging {
new MockIsrChangeListener()
}
class MockTopicConfigFetcher(var props: Properties) extends TopicConfigFetcher {
override def fetch(): Properties = props
}
def createTopicConfigProvider(props: Properties): MockTopicConfigFetcher = {
new MockTopicConfigFetcher(props)
}
def produceMessages(servers: Seq[KafkaServer],
records: Seq[ProducerRecord[Array[Byte], Array[Byte]]],
acks: Int = -1): Unit = {

View File

@ -22,7 +22,6 @@ import kafka.cluster.BrokerEndPoint;
import kafka.cluster.DelayedOperations;
import kafka.cluster.IsrChangeListener;
import kafka.cluster.Partition;
import kafka.cluster.PartitionStateStore;
import kafka.log.CleanerConfig;
import kafka.log.Defaults;
import kafka.log.LogAppendInfo;
@ -153,14 +152,12 @@ public class ReplicaFetcherThreadBenchmark {
.setReplicas(replicas)
.setIsNew(true);
PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
IsrChangeListener isrChangeListener = Mockito.mock(IsrChangeListener.class);
OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L));
AlterIsrManager isrChannelManager = Mockito.mock(AlterIsrManager.class);
Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(),
0, Time.SYSTEM, partitionStateStore, isrChangeListener, new DelayedOperationsMock(tp),
0, Time.SYSTEM, Properties::new, isrChangeListener, new DelayedOperationsMock(tp),
Mockito.mock(MetadataCache.class), logManager, isrChannelManager);
partition.makeFollower(partitionState, offsetCheckpoints);

View File

@ -21,7 +21,6 @@ import kafka.api.ApiVersion$;
import kafka.cluster.DelayedOperations;
import kafka.cluster.IsrChangeListener;
import kafka.cluster.Partition;
import kafka.cluster.PartitionStateStore;
import kafka.log.CleanerConfig;
import kafka.log.Defaults;
import kafka.log.LogConfig;
@ -116,14 +115,12 @@ public class PartitionMakeFollowerBenchmark {
TopicPartition tp = new TopicPartition("topic", 0);
PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L));
IsrChangeListener isrChangeListener = Mockito.mock(IsrChangeListener.class);
AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
partition = new Partition(tp, 100,
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
partitionStateStore, isrChangeListener, delayedOperations,
Properties::new, isrChangeListener, delayedOperations,
Mockito.mock(MetadataCache.class), logManager, alterIsrManager);
partition.createLogIfNotExists(true, false, offsetCheckpoints);
executorService.submit((Runnable) () -> {

View File

@ -21,7 +21,6 @@ import kafka.api.ApiVersion$;
import kafka.cluster.DelayedOperations;
import kafka.cluster.IsrChangeListener;
import kafka.cluster.Partition;
import kafka.cluster.PartitionStateStore;
import kafka.log.CleanerConfig;
import kafka.log.Defaults;
import kafka.log.LogConfig;
@ -115,13 +114,11 @@ public class UpdateFollowerFetchStateBenchmark {
.setZkVersion(1)
.setReplicas(replicas)
.setIsNew(true);
PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
IsrChangeListener isrChangeListener = Mockito.mock(IsrChangeListener.class);
AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
partition = new Partition(topicPartition, 100,
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
partitionStateStore, isrChangeListener, delayedOperations,
Properties::new, isrChangeListener, delayedOperations,
Mockito.mock(MetadataCache.class), logManager, alterIsrManager);
partition.makeLeader(partitionState, offsetCheckpoints);
}

View File

@ -18,7 +18,6 @@ package org.apache.kafka.jmh.server;
import java.util.Properties;
import kafka.cluster.Partition;
import kafka.cluster.PartitionStateStore;
import kafka.log.CleanerConfig;
import kafka.log.LogConfig;
import kafka.log.LogManager;
@ -39,7 +38,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
@ -150,8 +148,6 @@ public class CheckpointBench {
}
}
PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L);
for (TopicPartition topicPartition : topicPartitions) {
final Partition partition = this.replicaManager.createPartition(topicPartition);