mirror of https://github.com/apache/kafka.git
MINOR: Replace unused variables by underscore (#5003)
And remove one unused expression. Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
parent
8c5d7e0408
commit
8d1e96181d
|
|
@ -65,7 +65,7 @@ object LogDirsCommand {
|
|||
Map(
|
||||
"logDir" -> logDir,
|
||||
"error" -> logDirInfo.error.exceptionName(),
|
||||
"partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, replicaInfo) =>
|
||||
"partitions" -> logDirInfo.replicaInfos.asScala.filter { case (topicPartition, _) =>
|
||||
topicSet.isEmpty || topicSet.contains(topicPartition.topic)
|
||||
}.map { case (topicPartition, replicaInfo) =>
|
||||
Map(
|
||||
|
|
|
|||
|
|
@ -256,7 +256,7 @@ object ReassignPartitionsCommand extends Logging {
|
|||
val newReplicas = partitionFields("replicas").to[Seq[Int]]
|
||||
val newLogDirs = partitionFields.get("log_dirs") match {
|
||||
case Some(jsonValue) => jsonValue.to[Seq[String]]
|
||||
case None => newReplicas.map(r => AnyLogDir)
|
||||
case None => newReplicas.map(_ => AnyLogDir)
|
||||
}
|
||||
if (newReplicas.size != newLogDirs.size)
|
||||
throw new AdminCommandFailedException(s"Size of replicas list $newReplicas is different from " +
|
||||
|
|
@ -569,7 +569,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
|
|||
} catch {
|
||||
case t: ExecutionException =>
|
||||
t.getCause match {
|
||||
case e: ReplicaNotAvailableException => None // It is OK if the replica is not available at this moment
|
||||
case _: ReplicaNotAvailableException => None // It is OK if the replica is not available at this moment
|
||||
case e: Throwable => throw new AdminCommandFailedException(s"Failed to alter dir for $replica", e)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
|
|||
|
||||
if (ctx.consumersForTopic.nonEmpty) {
|
||||
// Collect consumer thread ids across all topics, remove duplicates, and sort to ensure determinism
|
||||
val allThreadIds = ctx.consumersForTopic.flatMap { case (topic, threadIds) =>
|
||||
val allThreadIds = ctx.consumersForTopic.flatMap { case (_, threadIds) =>
|
||||
threadIds
|
||||
}.toSet.toSeq.sorted
|
||||
|
||||
|
|
|
|||
|
|
@ -300,7 +300,7 @@ class PartitionStateMachine(config: KafkaConfig,
|
|||
failedElections.put(partition, getDataResponse.resultException.get)
|
||||
}
|
||||
}
|
||||
val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (partition, leaderIsrAndControllerEpoch) =>
|
||||
val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) =>
|
||||
leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
|
||||
}
|
||||
invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) =>
|
||||
|
|
@ -323,12 +323,12 @@ class PartitionStateMachine(config: KafkaConfig,
|
|||
case ControlledShutdownPartitionLeaderElectionStrategy =>
|
||||
leaderForControlledShutdown(validPartitionsForElection, shuttingDownBrokers).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
|
||||
}
|
||||
partitionsWithoutLeaders.foreach { case (partition, leaderAndIsrOpt, recipients) =>
|
||||
partitionsWithoutLeaders.foreach { case (partition, _, _) =>
|
||||
val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
|
||||
failedElections.put(partition, new StateChangeFailedException(failMsg))
|
||||
}
|
||||
val recipientsPerPartition = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> recipients }.toMap
|
||||
val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap
|
||||
val recipientsPerPartition = partitionsWithLeaders.map { case (partition, _, recipients) => partition -> recipients }.toMap
|
||||
val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, _) => partition -> leaderAndIsrOpt.get }.toMap
|
||||
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
|
||||
adjustedLeaderAndIsrs, controllerContext.epoch)
|
||||
successfulUpdates.foreach { case (partition, leaderAndIsr) =>
|
||||
|
|
|
|||
|
|
@ -292,7 +292,7 @@ class ReplicaStateMachine(config: KafkaConfig,
|
|||
Seq[TopicPartition],
|
||||
Map[TopicPartition, Exception]) = {
|
||||
val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk, failedStateReads) = getTopicPartitionStatesFromZk(partitions)
|
||||
val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (partition, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) }
|
||||
val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (_, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) }
|
||||
val adjustedLeaderAndIsrs = leaderAndIsrsWithReplica.mapValues { leaderAndIsr =>
|
||||
val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
|
||||
val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
|
||||
|
|
|
|||
|
|
@ -465,7 +465,7 @@ class GroupMetadataManager(brokerId: Int,
|
|||
}
|
||||
|
||||
case Some(topicPartitions) =>
|
||||
topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
|
||||
topicPartitions.map { topicPartition =>
|
||||
val partitionData = group.offset(topicPartition) match {
|
||||
case None =>
|
||||
new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE)
|
||||
|
|
|
|||
|
|
@ -140,7 +140,7 @@ class TransactionStateManager(brokerId: Int,
|
|||
val now = time.milliseconds()
|
||||
inReadLock(stateLock) {
|
||||
val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
|
||||
transactionMetadataCache.flatMap { case (partition, entry) =>
|
||||
transactionMetadataCache.flatMap { case (_, entry) =>
|
||||
entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match {
|
||||
case Empty | CompleteCommit | CompleteAbort => true
|
||||
case _ => false
|
||||
|
|
|
|||
|
|
@ -755,7 +755,7 @@ class Log(@volatile var dir: File,
|
|||
records = validRecords)
|
||||
|
||||
// update the producer state
|
||||
for ((producerId, producerAppendInfo) <- updatedProducers) {
|
||||
for ((_, producerAppendInfo) <- updatedProducers) {
|
||||
producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
|
||||
producerStateManager.update(producerAppendInfo)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -878,9 +878,9 @@ class LogManager(logDirs: Seq[File],
|
|||
def allLogs: Iterable[Log] = currentLogs.values ++ futureLogs.values
|
||||
|
||||
def logsByTopic(topic: String): Seq[Log] = {
|
||||
(currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, log) =>
|
||||
(currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, _) =>
|
||||
topicPartition.topic() == topic
|
||||
}.map { case (topicPartition, log) => log }
|
||||
}.map { case (_, log) => log }
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -582,7 +582,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
|
|||
* Expire any producer ids which have been idle longer than the configured maximum expiration timeout.
|
||||
*/
|
||||
def removeExpiredProducers(currentTimeMs: Long) {
|
||||
producers.retain { case (producerId, lastEntry) =>
|
||||
producers.retain { case (_, lastEntry) =>
|
||||
!isProducerExpired(currentTimeMs, lastEntry)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -512,7 +512,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
}
|
||||
})
|
||||
} else {
|
||||
fetchContext.foreachPartition((part, data) => {
|
||||
fetchContext.foreachPartition((part, _) => {
|
||||
erroneous += part -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
|
||||
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
|
||||
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
|
||||
|
|
|
|||
|
|
@ -150,7 +150,7 @@ class ReplicaAlterLogDirsThread(name: String,
|
|||
.filter { case (_, state) => state.isTruncatingLog }
|
||||
.map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap
|
||||
|
||||
val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }
|
||||
val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (_, epochCacheOpt) => epochCacheOpt.nonEmpty }
|
||||
|
||||
val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
|
||||
ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
|
||||
|
|
@ -217,7 +217,7 @@ class ReplicaAlterLogDirsThread(name: String,
|
|||
|
||||
def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[FetchRequest] = {
|
||||
// Only include replica in the fetch request if it is not throttled.
|
||||
val maxPartitionOpt = partitionMap.filter { case (topicPartition, partitionFetchState) =>
|
||||
val maxPartitionOpt = partitionMap.filter { case (_, partitionFetchState) =>
|
||||
partitionFetchState.isReadyForFetch && !quota.isQuotaExceeded
|
||||
}.reduceLeftOption { (left, right) =>
|
||||
if ((left._1.topic > right._1.topic()) || (left._1.topic == right._1.topic() && left._1.partition() >= right._1.partition()))
|
||||
|
|
@ -237,7 +237,7 @@ class ReplicaAlterLogDirsThread(name: String,
|
|||
val logStartOffset = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId).logStartOffset
|
||||
requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize))
|
||||
} catch {
|
||||
case e: KafkaStorageException =>
|
||||
case _: KafkaStorageException =>
|
||||
partitionsWithError += topicPartition
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -333,7 +333,7 @@ class ReplicaFetcherThread(name: String,
|
|||
.filter { case (_, state) => state.isTruncatingLog }
|
||||
.map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap
|
||||
|
||||
val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }
|
||||
val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (_, epochCacheOpt) => epochCacheOpt.nonEmpty }
|
||||
|
||||
debug(s"Build leaderEpoch request $partitionsWithEpoch")
|
||||
val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
|
||||
|
|
|
|||
|
|
@ -560,7 +560,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
// 1. the delete records operation on this partition is successful
|
||||
// 2. low watermark of this partition is smaller than the specified offset
|
||||
private def delayedDeleteRecordsRequired(localDeleteRecordsResults: Map[TopicPartition, LogDeleteRecordsResult]): Boolean = {
|
||||
localDeleteRecordsResults.exists{ case (tp, deleteRecordsResult) =>
|
||||
localDeleteRecordsResults.exists{ case (_, deleteRecordsResult) =>
|
||||
deleteRecordsResult.exception.isEmpty && deleteRecordsResult.lowWatermark < deleteRecordsResult.requestedOffset
|
||||
}
|
||||
}
|
||||
|
|
@ -654,7 +654,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
}
|
||||
|
||||
} catch {
|
||||
case e: KafkaStorageException =>
|
||||
case _: KafkaStorageException =>
|
||||
(absolutePath, new LogDirInfo(Errors.KAFKA_STORAGE_ERROR, Map.empty[TopicPartition, ReplicaInfo].asJava))
|
||||
case t: Throwable =>
|
||||
error(s"Error while describing replica in dir $absolutePath", t)
|
||||
|
|
|
|||
|
|
@ -372,8 +372,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
|
|||
*/
|
||||
def changeBrokerConfig(broker: Option[Int], configs: Properties): Unit = {
|
||||
validateBrokerConfig(configs)
|
||||
val entityName = broker.map(_.toString).getOrElse(ConfigEntityName.Default)
|
||||
changeEntityConfig(ConfigType.Broker, broker.map(String.valueOf).getOrElse(ConfigEntityName.Default), configs)
|
||||
changeEntityConfig(ConfigType.Broker, broker.map(_.toString).getOrElse(ConfigEntityName.Default), configs)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -1315,7 +1315,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
|
|||
createRecursive(ClusterIdZNode.path, ClusterIdZNode.toJson(proposedClusterId))
|
||||
proposedClusterId
|
||||
} catch {
|
||||
case e: NodeExistsException => getClusterId.getOrElse(
|
||||
case _: NodeExistsException => getClusterId.getOrElse(
|
||||
throw new KafkaException("Failed to get cluster id from Zookeeper. This can happen if /cluster/id is deleted from Zookeeper."))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue