KAFKA-8470: State change logs should not be in TRACE level (#8320)

1. Defaults state-change log level to INFO.
2. INFO level state-change log includes  (a) request level logging with just partition counts; (b) the leader/isr changes per partition in the controller and in the broker (reduced to mostly just 1 logging per partition).

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Stanislav Kozlovski 2020-03-26 23:53:40 +02:00 committed by GitHub
parent 008a3b21f6
commit c59835c1d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 146 additions and 108 deletions

View File

@ -82,7 +82,7 @@ log4j.additivity.kafka.controller=false
log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false
log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.logger.state.change.logger=INFO, stateChangeAppender
log4j.additivity.state.change.logger=false
# Access denials are logged at INFO level, change to DEBUG to also log allowed accesses

View File

@ -21,7 +21,7 @@ import java.util.{Optional, Properties}
import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.KafkaController
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log._
import kafka.metrics.KafkaMetricsGroup
import kafka.server._
@ -202,6 +202,7 @@ class Partition(val topicPartition: TopicPartition,
def topic: String = topicPartition.topic
def partitionId: Int = topicPartition.partition
private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
private val remoteReplicasMap = new Pool[Int, Replica]
// The read lock is only required when multiple reads are executed and needs to be in a consistent manner
private val leaderIsrUpdateLock = new ReentrantReadWriteLock
@ -485,18 +486,22 @@ class Partition(val topicPartition: TopicPartition,
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionState.controllerEpoch
val isr = partitionState.isr.asScala.map(_.toInt).toSet
val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt)
val removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
updateAssignmentAndIsr(
assignment = partitionState.replicas.asScala.map(_.toInt),
isr = partitionState.isr.asScala.map(_.toInt).toSet,
addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
isr = isr,
addingReplicas = addingReplicas,
removingReplicas = removingReplicas
)
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
val leaderLog = localLogOrException
val leaderEpochStartOffset = leaderLog.logEndOffset
info(s"$topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
s"offset $leaderEpochStartOffset with high watermark ${leaderLog.highWatermark}. " +
stateChangeLogger.info(s"Leader $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
s"offset $leaderEpochStartOffset with high watermark ${leaderLog.highWatermark} " +
s"ISR ${isr.mkString(",")} addingReplicas ${addingReplicas.mkString(",")} removingReplicas ${removingReplicas.mkString(",")}." +
s"Previous leader epoch was $leaderEpoch.")
//We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
@ -564,6 +569,12 @@ class Partition(val topicPartition: TopicPartition,
)
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
val followerLog = localLogOrException
val leaderEpochEndOffset = followerLog.logEndOffset
stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
s"offset $leaderEpochEndOffset with high watermark ${followerLog.highWatermark}. " +
s"Previous leader epoch was $leaderEpoch.")
leaderEpoch = partitionState.leaderEpoch
leaderEpochStartOffsetOpt = None
zkVersion = partitionState.zkVersion

View File

@ -447,14 +447,18 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
leaderAndIsrRequestMap.filterKeys(controllerContext.liveOrShuttingDownBrokerIds.contains).foreach {
case (broker, leaderAndIsrPartitionStates) =>
if (stateChangeLog.isTraceEnabled) {
leaderAndIsrPartitionStates.foreach { case (topicPartition, state) =>
val numBecomeLeaders = leaderAndIsrPartitionStates.count { case (topicPartition, state) =>
val isBecomeLeader = broker == state.leader
val typeOfRequest =
if (broker == state.leader) "become-leader"
if (isBecomeLeader) "become-leader"
else "become-follower"
if (stateChangeLog.isTraceEnabled)
stateChangeLog.trace(s"Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition")
isBecomeLeader
}
}
stateChangeLog.info(s"Sending LeaderAndIsr request to broker $broker with $numBecomeLeaders become-leader " +
s"and ${leaderAndIsrPartitionStates.size - numBecomeLeaders} become-follower partitions")
val leaderIds = leaderAndIsrPartitionStates.map(_._2.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
_.node(config.interBrokerListenerName)
@ -473,10 +477,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
}
private def sendUpdateMetadataRequests(controllerEpoch: Int, stateChangeLog: StateChangeLogger): Unit = {
updateMetadataRequestPartitionInfoMap.foreach { case (tp, partitionState) =>
stateChangeLog.trace(s"Sending UpdateMetadata request $partitionState to brokers $updateMetadataRequestBrokerSet " +
s"for partition $tp")
}
stateChangeLog.info(s"Sending UpdateMetadata request to brokers $updateMetadataRequestBrokerSet " +
s"for ${updateMetadataRequestPartitionInfoMap.size} partitions")
val partitionStates = updateMetadataRequestPartitionInfoMap.values.toBuffer
val updateMetadataRequestVersion: Short =
@ -528,7 +530,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
updateMetadataRequestPartitionInfoMap.clear()
}
private def sendStopReplicaRequests(controllerEpoch: Int): Unit = {
private def sendStopReplicaRequests(controllerEpoch: Int, stateChangeLog: StateChangeLogger): Unit = {
val stopReplicaRequestVersion: Short =
if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 2
else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1
@ -550,19 +552,24 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
brokerEpoch, deletePartitions, partitions)
}
val traceEnabled = stateChangeLog.isTraceEnabled
stopReplicaRequestMap.filterKeys(controllerContext.liveOrShuttingDownBrokerIds.contains).foreach { case (brokerId, replicaInfoList) =>
val (stopReplicaWithDelete, stopReplicaWithoutDelete) = replicaInfoList.partition(r => r.deletePartition)
val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(brokerId)
if (stopReplicaWithDelete.nonEmpty) {
debug(s"The stop replica request (delete = true) sent to broker $brokerId is ${stopReplicaWithDelete.mkString(",")}")
stateChangeLog.info(s"Sending a stop replica request (delete = true) for ${stopReplicaWithDelete.size} replicas to broker $brokerId")
if (traceEnabled)
stateChangeLog.trace(s"The stop replica request (delete = true) sent to broker $brokerId contains ${stopReplicaWithDelete.map(_.replica).mkString(",")}")
val stopReplicaRequest = createStopReplicaRequest(brokerEpoch, stopReplicaWithDelete, deletePartitions = true)
val callback = stopReplicaPartitionDeleteResponseCallback(brokerId) _
sendRequest(brokerId, stopReplicaRequest, callback)
}
if (stopReplicaWithoutDelete.nonEmpty) {
debug(s"The stop replica request (delete = false) sent to broker $brokerId is ${stopReplicaWithoutDelete.mkString(",")}")
stateChangeLog.info(s"Sending a stop replica request (delete = false) for ${stopReplicaWithoutDelete.size} replicas to broker $brokerId")
if (traceEnabled)
stateChangeLog.trace(s"The stop replica request (delete = false) sent to broker $brokerId contains ${stopReplicaWithoutDelete.map(_.replica).mkString(",")}")
val stopReplicaRequest = createStopReplicaRequest(brokerEpoch, stopReplicaWithoutDelete, deletePartitions = false)
sendRequest(brokerId, stopReplicaRequest)
}
@ -575,7 +582,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerEpoch)
sendLeaderAndIsrRequest(controllerEpoch, stateChangeLog)
sendUpdateMetadataRequests(controllerEpoch, stateChangeLog)
sendStopReplicaRequests(controllerEpoch)
sendStopReplicaRequests(controllerEpoch, stateChangeLog)
} catch {
case e: Throwable =>
if (leaderAndIsrRequestMap.nonEmpty) {

View File

@ -884,7 +884,7 @@ class KafkaController(val config: KafkaConfig,
case e: IllegalStateException =>
handleIllegalState(e)
}
stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with " +
stateChangeLog.info(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with " +
s"new replica assignment $assignment to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " +
s"for partition being reassigned $topicPartition")

View File

@ -206,6 +206,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
val traceEnabled = stateChangeLog.isTraceEnabled
partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
@ -213,7 +214,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
targetState match {
case NewPartition =>
validPartitions.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState with " +
stateChangeLog.info(s"Changed partition $partition state from ${partitionState(partition)} to $targetState with " +
s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
controllerContext.putPartitionState(partition, NewPartition)
}
@ -224,7 +225,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
if (uninitializedPartitions.nonEmpty) {
val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
successfulInitializations.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
stateChangeLog.info(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
controllerContext.putPartitionState(partition, OnlinePartition)
}
@ -239,7 +240,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
electionResults.foreach {
case (partition, Right(leaderAndIsr)) =>
stateChangeLog.trace(
stateChangeLog.info(
s"Changed partition $partition from ${partitionState(partition)} to $targetState with state $leaderAndIsr"
)
controllerContext.putPartitionState(partition, OnlinePartition)
@ -252,12 +253,14 @@ class ZkPartitionStateMachine(config: KafkaConfig,
}
case OfflinePartition =>
validPartitions.foreach { partition =>
if (traceEnabled)
stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
controllerContext.putPartitionState(partition, OfflinePartition)
}
Map.empty
case NonExistentPartition =>
validPartitions.foreach { partition =>
if (traceEnabled)
stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
controllerContext.putPartitionState(partition, NonExistentPartition)
}

View File

@ -156,6 +156,8 @@ class ZkReplicaStateMachine(config: KafkaConfig,
* @param targetState The end state that the replica should be moved to
*/
private def doHandleStateChanges(replicaId: Int, replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
val stateLogger = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
val traceEnabled = stateLogger.isTraceEnabled
replicas.foreach(replica => controllerContext.putReplicaStateIfNotExists(replica, NonExistentReplica))
val (validReplicas, invalidReplicas) = controllerContext.checkValidReplicaStateChange(replicas, targetState)
invalidReplicas.foreach(replica => logInvalidTransition(replica, targetState))
@ -177,11 +179,13 @@ class ZkReplicaStateMachine(config: KafkaConfig,
leaderIsrAndControllerEpoch,
controllerContext.partitionFullReplicaAssignment(replica.topicPartition),
isNew = true)
logSuccessfulTransition(replicaId, partition, currentState, NewReplica)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, partition, currentState, NewReplica)
controllerContext.putReplicaState(replica, NewReplica)
}
case None =>
logSuccessfulTransition(replicaId, partition, currentState, NewReplica)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, partition, currentState, NewReplica)
controllerContext.putReplicaState(replica, NewReplica)
}
}
@ -208,7 +212,8 @@ class ZkReplicaStateMachine(config: KafkaConfig,
case None =>
}
}
logSuccessfulTransition(replicaId, partition, currentState, OnlineReplica)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, partition, currentState, OnlineReplica)
controllerContext.putReplicaState(replica, OnlineReplica)
}
case OfflineReplica =>
@ -220,6 +225,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
}
val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
@ -229,33 +235,38 @@ class ZkReplicaStateMachine(config: KafkaConfig,
}
val replica = PartitionAndReplica(partition, replicaId)
val currentState = controllerContext.replicaState(replica)
logSuccessfulTransition(replicaId, partition, currentState, OfflineReplica)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, partition, currentState, OfflineReplica)
controllerContext.putReplicaState(replica, OfflineReplica)
}
replicasWithoutLeadershipInfo.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
logSuccessfulTransition(replicaId, replica.topicPartition, currentState, OfflineReplica)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, OfflineReplica)
controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(replica.topicPartition))
controllerContext.putReplicaState(replica, OfflineReplica)
}
case ReplicaDeletionStarted =>
validReplicas.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionStarted)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, ReplicaDeletionStarted)
controllerContext.putReplicaState(replica, ReplicaDeletionStarted)
controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = true)
}
case ReplicaDeletionIneligible =>
validReplicas.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionIneligible)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, ReplicaDeletionIneligible)
controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
}
case ReplicaDeletionSuccessful =>
validReplicas.foreach { replica =>
val currentState = controllerContext.replicaState(replica)
logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionSuccessful)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, ReplicaDeletionSuccessful)
controllerContext.putReplicaState(replica, ReplicaDeletionSuccessful)
}
case NonExistentReplica =>
@ -266,7 +277,8 @@ class ZkReplicaStateMachine(config: KafkaConfig,
.removeReplica(replica.replica)
controllerContext.updatePartitionFullReplicaAssignment(replica.topicPartition, newAssignedReplicas)
logSuccessfulTransition(replicaId, replica.topicPartition, currentState, NonExistentReplica)
if (traceEnabled)
logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, NonExistentReplica)
controllerContext.removeReplicaState(replica)
}
}
@ -420,9 +432,9 @@ class ZkReplicaStateMachine(config: KafkaConfig,
(result.toMap, partitionsWithNoLeaderAndIsrInZk)
}
private def logSuccessfulTransition(replicaId: Int, partition: TopicPartition, currState: ReplicaState, targetState: ReplicaState): Unit = {
stateChangeLogger.withControllerEpoch(controllerContext.epoch)
.trace(s"Changed state of replica $replicaId for partition $partition from $currState to $targetState")
private def logSuccessfulTransition(logger: StateChangeLogger, replicaId: Int, partition: TopicPartition,
currState: ReplicaState, targetState: ReplicaState): Unit = {
logger.trace(s"Changed state of replica $replicaId for partition $partition from $currState to $targetState")
}
private def logInvalidTransition(replica: PartitionAndReplica, targetState: ReplicaState): Unit = {

View File

@ -285,7 +285,7 @@ class MetadataCache(brokerId: Int) extends Logging {
val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size)
val controllerId = updateMetadataRequest.controllerId match {
val controllerIdOpt = updateMetadataRequest.controllerId match {
case id if id < 0 => None
case id => Some(id)
}
@ -312,7 +312,7 @@ class MetadataCache(brokerId: Int) extends Logging {
val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
if (!updateMetadataRequest.partitionStates.iterator.hasNext) {
metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerId, aliveBrokers, aliveNodes)
metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerIdOpt, aliveBrokers, aliveNodes)
} else {
//since kafka may do partial metadata updates, we start by copying the previous state
val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)
@ -321,22 +321,32 @@ class MetadataCache(brokerId: Int) extends Logging {
copy ++= oldPartitionStates
partitionStates += (topic -> copy)
}
updateMetadataRequest.partitionStates.asScala.foreach { info =>
val traceEnabled = stateChangeLogger.isTraceEnabled
val controllerId = updateMetadataRequest.controllerId
val controllerEpoch = updateMetadataRequest.controllerEpoch
val tp = new TopicPartition(info.topicName, info.partitionIndex)
if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
val newStates = updateMetadataRequest.partitionStates.asScala
newStates.foreach { state =>
// per-partition logging here can be very expensive due going through all partitions in the cluster
val tp = new TopicPartition(state.topicName, state.partitionIndex)
if (state.leader == LeaderAndIsr.LeaderDuringDelete) {
removePartitionInfo(partitionStates, tp.topic, tp.partition)
if (traceEnabled)
stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " +
s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
deletedPartitions += tp
} else {
addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info)
stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " +
addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, state)
if (traceEnabled)
stateChangeLogger.trace(s"Cached leader info $state for partition $tp in response to " +
s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
}
}
metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes)
val cachedPartitionsCount = newStates.size - deletedPartitions.size
stateChangeLogger.info(s"Add $cachedPartitionsCount partitions and deleted ${deletedPartitions.size} partitions from metadata cache " +
s"in response to UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
metadataSnapshot = MetadataSnapshot(partitionStates, controllerIdOpt, aliveBrokers, aliveNodes)
}
deletedPartitions
}

View File

@ -339,7 +339,7 @@ class ReplicaManager(val config: KafkaConfig,
}
case HostedPartition.None =>
stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition " +
stateChangeLogger.info(s"Ignoring stop replica (delete=$deletePartition) for partition " +
s"$topicPartition as replica doesn't exist on broker")
}
@ -377,7 +377,9 @@ class ReplicaManager(val config: KafkaConfig,
// First stop fetchers for all partitions, then stop the corresponding replicas
replicaFetcherManager.removeFetcherForPartitions(partitions)
replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
for (topicPartition <- partitions){
stateChangeLogger.info(s"Handling stop replica (delete=${stopReplicaRequest.deletePartitions()}) for ${partitions.size} partitions")
for (topicPartition <- partitions) {
try {
stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
responseMap.put(topicPartition, Errors.NONE)
@ -1176,29 +1178,32 @@ class ReplicaManager(val config: KafkaConfig,
def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
if (stateChangeLogger.isTraceEnabled) {
val controllerId = leaderAndIsrRequest.controllerId
val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
stateChangeLogger.info(s"Handling LeaderAndIsr request correlationId $correlationId from controller " +
s"$controllerId for ${requestPartitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled)
leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
stateChangeLogger.trace(s"Received LeaderAndIsr request $partitionState " +
s"correlation id $correlationId from controller ${leaderAndIsrRequest.controllerId} " +
s"correlation id $correlationId from controller $controllerId " +
s"epoch ${leaderAndIsrRequest.controllerEpoch}")
}
}
replicaStateChangeLock synchronized {
if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " +
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
s"Latest known controller epoch is $controllerEpoch")
leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
} else {
val responseMap = new mutable.HashMap[TopicPartition, Errors]
val controllerId = leaderAndIsrRequest.controllerId
controllerEpoch = leaderAndIsrRequest.controllerEpoch
// First check partition's leader epoch
val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
val updatedPartitions = new mutable.HashSet[Partition]
leaderAndIsrRequest.partitionStates.asScala.foreach { partitionState =>
requestPartitionStates.foreach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
val partitionOpt = getPartition(topicPartition) match {
case HostedPartition.Offline =>
@ -1242,7 +1247,7 @@ class ReplicaManager(val config: KafkaConfig,
s"leader epoch $currentLeaderEpoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
} else {
stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " +
stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch matches the current leader epoch")
@ -1354,29 +1359,29 @@ class ReplicaManager(val config: KafkaConfig,
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors],
highWatermarkCheckpoints: OffsetCheckpoints): Set[Partition] = {
val traceEnabled = stateChangeLogger.isTraceEnabled
partitionStates.keys.foreach { partition =>
if (traceEnabled)
stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " +
s"controller $controllerId epoch $controllerEpoch starting the become-leader transition for " +
s"partition ${partition.topicPartition}")
}
for (partition <- partitionStates.keys)
responseMap.put(partition.topicPartition, Errors.NONE)
}
val partitionsToMakeLeaders = mutable.Set[Partition]()
try {
// First stop fetchers for all the partitions
replicaFetcherManager.removeFetcherForPartitions(partitionStates.keySet.map(_.topicPartition))
stateChangeLogger.info(s"Stopped fetchers as part of LeaderAndIsr request correlationId $correlationId from " +
s"controller $controllerId epoch $controllerEpoch as part of the become-leader transition for " +
s"${partitionStates.size} partitions")
// Update the partition information to be the leader
partitionStates.foreach { case (partition, partitionState) =>
try {
if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) {
if (partition.makeLeader(partitionState, highWatermarkCheckpoints))
partitionsToMakeLeaders += partition
stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " +
s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " +
s"(last update controller epoch ${partitionState.controllerEpoch})")
} else
else
stateChangeLogger.info(s"Skipped the become-leader state change after marking its " +
s"partition as leader with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
s"partition ${partition.topicPartition} (last update controller epoch ${partitionState.controllerEpoch}) " +
@ -1403,6 +1408,7 @@ class ReplicaManager(val config: KafkaConfig,
throw e
}
if (traceEnabled)
partitionStates.keys.foreach { partition =>
stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
s"epoch $controllerEpoch for the become-leader transition for partition ${partition.topicPartition}")
@ -1435,14 +1441,14 @@ class ReplicaManager(val config: KafkaConfig,
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors],
highWatermarkCheckpoints: OffsetCheckpoints) : Set[Partition] = {
val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
partitionStates.foreach { case (partition, partitionState) =>
if (traceLoggingEnabled)
stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " +
s"${partitionState.leader}")
}
for (partition <- partitionStates.keys)
responseMap.put(partition.topicPartition, Errors.NONE)
}
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try {
@ -1487,29 +1493,22 @@ class ReplicaManager(val config: KafkaConfig,
}
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
s"epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " +
s"${partitionStates(partition).leader}")
}
stateChangeLogger.info(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions")
partitionsToMakeFollower.foreach { partition =>
completeDelayedFetchOrProduceRequests(partition.topicPartition)
}
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for partition " +
s"${partition.topicPartition} as part of become-follower request with correlation id $correlationId from " +
s"controller $controllerId epoch $controllerEpoch with leader ${partitionStates(partition).leader}")
}
if (isShuttingDown.get()) {
if (traceLoggingEnabled) {
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " +
s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " +
"since it is shutting down")
}
}
} else {
// we do not need to check if the leader exists again since this has been done at the beginning of this process
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
@ -1520,11 +1519,6 @@ class ReplicaManager(val config: KafkaConfig,
}.toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
partitionsToMakeFollowerWithLeaderAndOffset.foreach { case (partition, initialFetchState) =>
stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower " +
s"request from controller $controllerId epoch $controllerEpoch with correlation id $correlationId for " +
s"partition $partition with leader ${initialFetchState.leader}")
}
}
} catch {
case e: Throwable =>
@ -1534,6 +1528,7 @@ class ReplicaManager(val config: KafkaConfig,
throw e
}
if (traceLoggingEnabled)
partitionStates.keys.foreach { partition =>
stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
s"epoch $controllerEpoch for the become-follower transition for partition ${partition.topicPartition} with leader " +