MINOR: Use `Map.forKeyValue` to avoid tuple allocation in Scala 2.13 (#9299)

`forKeyValue` invokes `foreachEntry` in Scala 2.13 and falls back to
`foreach` in Scala 2.12.

This change requires a newer version of scala-collection-compat, so
update it to the latest version (2.2.0).

Finally, included a minor clean-up in `GetOffsetShell` to use `toArray`
before `sortBy` since it's more efficient.

Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, José Armando García Sancio <jsancio@users.noreply.github.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ismael Juma 2020-09-21 16:04:19 -07:00 committed by GitHub
parent 5cf5cc2fc0
commit ac89e49bf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 166 additions and 111 deletions

View File

@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils}
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, ClientUtils, CommonClientConfigs, Metadata, NetworkClient, NodeApiVersions}
@ -59,7 +60,7 @@ object BrokerApiVersionsCommand {
val adminClient = createAdminClient(opts)
adminClient.awaitBrokers()
val brokerMap = adminClient.listAllBrokerVersionInfo()
brokerMap.foreach { case (broker, versionInfoOrError) =>
brokerMap.forKeyValue { (broker, versionInfoOrError) =>
versionInfoOrError match {
case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")
case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n")

View File

@ -550,7 +550,7 @@ object ConfigCommand extends Config {
private def describeClientQuotaAndUserScramCredentialConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]) = {
val quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames)
quotaConfigs.foreach { case (entity, entries) =>
quotaConfigs.forKeyValue { (entity, entries) =>
val entityEntries = entity.entries.asScala
def entitySubstr(entityType: String): Option[String] =

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.dataformat.csv.CsvMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.CommonClientConfigs
@ -709,7 +710,7 @@ object ConsumerGroupCommand extends Logging {
private def createAdminClient(configOverrides: Map[String, String]): Admin = {
val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
configOverrides.foreach { case (k, v) => props.put(k, v)}
configOverrides.forKeyValue { (k, v) => props.put(k, v)}
Admin.create(props)
}

View File

@ -23,6 +23,7 @@ import kafka.common.AdminCommandFailedException
import kafka.utils.CommandDefaultOptions
import kafka.utils.CommandLineUtils
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import kafka.utils.Json
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
@ -171,7 +172,7 @@ object LeaderElectionCommand extends Logging {
if (failed.nonEmpty) {
val rootException = new AdminCommandFailedException(s"${failed.size} replica(s) could not be elected")
failed.foreach { case (topicPartition, exception) =>
failed.forKeyValue { (topicPartition, exception) =>
println(s"Error completing leader election ($electionType) for partition: $topicPartition: $exception")
rootException.addSuppressed(exception)
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException
import joptsimple.OptionSpecBuilder
import kafka.common.AdminCommandFailedException
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.ElectionType
@ -85,7 +86,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}
}
def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicPartition] = {
def parsePreferredReplicaElectionData(jsonString: String): collection.immutable.Set[TopicPartition] = {
Json.parseFull(jsonString) match {
case Some(js) =>
js.asJsonObject.get("partitions") match {
@ -269,7 +270,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
if (!failed.isEmpty) {
val rootException = new AdminCommandFailedException(s"${failed.size} preferred replica(s) could not be elected")
failed.foreach { case (topicPartition, exception) =>
failed.forKeyValue { (topicPartition, exception) =>
println(s"Error completing preferred leader election for partition: $topicPartition: $exception")
rootException.addSuppressed(exception)
}

View File

@ -24,6 +24,7 @@ import kafka.common.AdminCommandFailedException
import kafka.log.LogConfig
import kafka.server.{ConfigType, DynamicConfig}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Json, Logging}
import kafka.utils.Implicits._
import kafka.utils.json.JsonValue
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@ -798,7 +799,7 @@ object ReassignPartitionsCommand extends Logging {
: Map[TopicPartition, Seq[Int]] = {
val groupedByTopic = currentAssignment.groupBy { case (tp, _) => tp.topic }
val proposedAssignments = mutable.Map[TopicPartition, Seq[Int]]()
groupedByTopic.foreach { case (topic, assignment) =>
groupedByTopic.forKeyValue { (topic, assignment) =>
val (_, replicas) = assignment.head
val assignedReplicas = AdminUtils.
assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)

View File

@ -20,6 +20,7 @@ package kafka.admin
import joptsimple.{ArgumentAcceptingOptionSpec, OptionSet}
import kafka.server.KafkaConfig
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
import kafka.utils.Implicits._
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
@ -128,7 +129,7 @@ object ZkSecurityMigrator extends Logging {
// Now override any set system properties with explicitly-provided values from the config file
// Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make
info(s"Found ${zkTlsConfigFileProps.size()} ZooKeeper client configuration properties in file $filename")
zkTlsConfigFileProps.asScala.foreach { case (key, value) =>
zkTlsConfigFileProps.asScala.forKeyValue { (key, value) =>
info(s"Setting $key")
KafkaConfig.setZooKeeperClientProperty(zkClientConfig, key, value)
}

View File

@ -25,6 +25,7 @@ import kafka.cluster.Broker
import kafka.metrics.KafkaMetricsGroup
import kafka.server.KafkaConfig
import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.clients._
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
@ -460,11 +461,11 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
else 0
leaderAndIsrRequestMap.foreach { case (broker, leaderAndIsrPartitionStates) =>
leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates) =>
if (controllerContext.liveOrShuttingDownBrokerIds.contains(broker)) {
val leaderIds = mutable.Set.empty[Int]
var numBecomeLeaders = 0
leaderAndIsrPartitionStates.foreach { case (topicPartition, state) =>
leaderAndIsrPartitionStates.forKeyValue { (topicPartition, state) =>
leaderIds += state.leader
val typeOfRequest = if (broker == state.leader) {
numBecomeLeaders += 1
@ -570,10 +571,10 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
partitionErrorsForDeletingTopics))
}
stopReplicaRequestMap.foreach { case (brokerId, partitionStates) =>
stopReplicaRequestMap.forKeyValue { (brokerId, partitionStates) =>
if (controllerContext.liveOrShuttingDownBrokerIds.contains(brokerId)) {
if (traceEnabled)
partitionStates.foreach { case (topicPartition, partitionState) =>
partitionStates.forKeyValue { (topicPartition, partitionState) =>
stateChangeLog.trace(s"Sending StopReplica request $partitionState to " +
s"broker $brokerId for partition $topicPartition")
}
@ -581,7 +582,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(brokerId)
if (stopReplicaRequestVersion >= 3) {
val stopReplicaTopicState = mutable.Map.empty[String, StopReplicaTopicState]
partitionStates.foreach { case (topicPartition, partitionState) =>
partitionStates.forKeyValue { (topicPartition, partitionState) =>
val topicState = stopReplicaTopicState.getOrElseUpdate(topicPartition.topic,
new StopReplicaTopicState().setTopicName(topicPartition.topic))
topicState.partitionStates().add(partitionState)
@ -600,7 +601,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
val topicStatesWithDelete = mutable.Map.empty[String, StopReplicaTopicState]
val topicStatesWithoutDelete = mutable.Map.empty[String, StopReplicaTopicState]
partitionStates.foreach { case (topicPartition, partitionState) =>
partitionStates.forKeyValue { (topicPartition, partitionState) =>
val topicStates = if (partitionState.deletePartition()) {
numPartitionStateWithDelete += 1
topicStatesWithDelete

View File

@ -18,6 +18,7 @@
package kafka.controller
import kafka.cluster.Broker
import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition
import scala.collection.{Map, Seq, Set, mutable}
@ -464,7 +465,7 @@ class ControllerContext {
}
private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
partitionAssignments.getOrElse(topic, mutable.Map.empty).foreach { case (partition, replicaAssignment) =>
partitionAssignments.getOrElse(topic, mutable.Map.empty).forKeyValue { (partition, replicaAssignment) =>
partitionLeadershipInfo.get(new TopicPartition(topic, partition)).foreach { leadershipInfo =>
if (!hasPreferredLeader(replicaAssignment, leadershipInfo))
preferredReplicaImbalanceCount -= 1

View File

@ -25,6 +25,7 @@ import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeader
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server._
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zk._
import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler}
@ -793,7 +794,7 @@ class KafkaController(val config: KafkaConfig,
private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq): Unit = {
val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
leaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
}
}
@ -1054,7 +1055,7 @@ class KafkaController(val config: KafkaConfig,
}.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
// for each broker, check if a preferred replica election needs to be triggered
preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicPartitionsForBroker) =>
preferredReplicasForTopicsByBrokers.forKeyValue { (leaderBroker, topicPartitionsForBroker) =>
val topicsNotInPreferredReplica = topicPartitionsForBroker.filter { case (topicPartition, _) =>
val leadershipInfo = controllerContext.partitionLeadershipInfo(topicPartition)
leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
@ -1474,7 +1475,7 @@ class KafkaController(val config: KafkaConfig,
}
} else if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded")
partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>
partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) =>
controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)
}
onNewPartitionCreation(partitionsToBeAdded.keySet)
@ -1519,7 +1520,7 @@ class KafkaController(val config: KafkaConfig,
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]
zkClient.getPartitionReassignment.foreach { case (tp, targetReplicas) =>
zkClient.getPartitionReassignment.forKeyValue { (tp, targetReplicas) =>
maybeBuildReassignment(tp, Some(targetReplicas)) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
@ -1556,7 +1557,7 @@ class KafkaController(val config: KafkaConfig,
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]
reassignments.foreach { case (tp, targetReplicas) =>
reassignments.forKeyValue { (tp, targetReplicas) =>
val maybeApiError = targetReplicas.flatMap(validateReplicas(tp, _))
maybeApiError match {
case None =>

View File

@ -20,6 +20,7 @@ import kafka.api.LeaderAndIsr
import kafka.common.StateChangeFailedException
import kafka.controller.Election._
import kafka.server.KafkaConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
@ -301,7 +302,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
error("Controller moved to another broker when trying to create the topic partition state znode", e)
throw e
case e: Exception =>
partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
partitionsWithLiveReplicas.foreach { case (partition, _) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
Seq.empty
}
createResponses.foreach { createResponse =>
@ -433,7 +434,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
finishedUpdates.foreach { case (partition, result) =>
finishedUpdates.forKeyValue { (partition, result) =>
result.foreach { leaderAndIsr =>
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)

View File

@ -19,6 +19,7 @@ package kafka.controller
import kafka.api.LeaderAndIsr
import kafka.common.StateChangeFailedException
import kafka.server.KafkaConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
@ -107,7 +108,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
if (replicas.nonEmpty) {
try {
controllerBrokerRequestBatch.newBatch()
replicas.groupBy(_.replica).foreach { case (replicaId, replicas) =>
replicas.groupBy(_.replica).forKeyValue { (replicaId, replicas) =>
doHandleStateChanges(replicaId, replicas, targetState)
}
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
@ -224,7 +225,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined
}
val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)

View File

@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.common.OffsetAndMetadata
import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
import kafka.utils.Implicits._
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
@ -616,7 +617,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,
mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
offsets.foreach { case (topicPartition, offsetAndMetadata) =>
offsets.forKeyValue { (topicPartition, offsetAndMetadata) =>
producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata))
}
}
@ -660,7 +661,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
val pendingOffsetsOpt = pendingTransactionalOffsetCommits.remove(producerId)
if (isCommit) {
pendingOffsetsOpt.foreach { pendingOffsets =>
pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) =>
pendingOffsets.forKeyValue { (topicPartition, commitRecordMetadataAndOffset) =>
if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.")
@ -698,7 +699,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
topicPartitions.flatMap { topicPartition =>
pendingOffsetCommits.remove(topicPartition)
pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) =>
pendingTransactionalOffsetCommits.forKeyValue { (_, pendingOffsets) =>
pendingOffsets.remove(topicPartition)
}
val removedOffset = offsets.remove(topicPartition)

View File

@ -33,6 +33,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server.{FetchLogEnd, ReplicaManager}
import kafka.utils.CoreUtils.inLock
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
@ -402,7 +403,7 @@ class GroupMetadataManager(brokerId: Int,
val responseError = group.inLock {
if (status.error == Errors.NONE) {
if (!group.is(Dead)) {
filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
filteredOffsetMetadata.forKeyValue { (topicPartition, offsetAndMetadata) =>
if (isTxnOffsetCommit)
group.onTxnOffsetCommitAppend(producerId, topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
else
@ -414,7 +415,7 @@ class GroupMetadataManager(brokerId: Int,
if (!group.is(Dead)) {
if (!group.hasPendingOffsetCommitsFromProducer(producerId))
removeProducerGroup(producerId, group.groupId)
filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
filteredOffsetMetadata.forKeyValue { (topicPartition, offsetAndMetadata) =>
if (isTxnOffsetCommit)
group.failPendingTxnOffsetCommit(producerId, topicPartition)
else
@ -683,11 +684,11 @@ class GroupMetadataManager(brokerId: Int,
}.partition { case (group, _) => loadedGroups.contains(group) }
val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
pendingOffsets.foreach { case (producerId, producerOffsets) =>
pendingOffsets.forKeyValue { (producerId, producerOffsets) =>
producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
producerOffsets
.groupBy(_._1.group)
.foreach { case (group, offsets) =>
.forKeyValue { (group, offsets) =>
val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
groupProducerOffsets ++= offsets.map { case (groupTopicPartition, offset) =>
@ -821,7 +822,7 @@ class GroupMetadataManager(brokerId: Int,
replicaManager.nonOfflinePartition(appendPartition).foreach { partition =>
val tombstones = ArrayBuffer.empty[SimpleRecord]
removedOffsets.foreach { case (topicPartition, offsetAndMetadata) =>
removedOffsets.forKeyValue { (topicPartition, offsetAndMetadata) =>
trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
tombstones += new SimpleRecord(timestamp, commitKey, null)
@ -913,7 +914,7 @@ class GroupMetadataManager(brokerId: Int,
}
private def removeGroupFromAllProducers(groupId: String) = openGroupsForProducer synchronized {
openGroupsForProducer.foreach { case (_, groups) =>
openGroupsForProducer.forKeyValue { (_, groups) =>
groups.remove(groupId)
}
}

View File

@ -24,6 +24,7 @@ import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{KafkaConfig, MetadataCache}
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import org.apache.kafka.clients._
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
@ -115,7 +116,7 @@ class TxnMarkerQueue(@volatile var destination: Node) {
}
def forEachTxnTopicPartition[B](f:(Int, BlockingQueue[TxnIdAndMarkerEntry]) => B): Unit =
markersPerTxnTopicPartition.foreach { case (partition, queue) =>
markersPerTxnTopicPartition.forKeyValue { (partition, queue) =>
if (!queue.isEmpty) f(partition, queue)
}

View File

@ -27,6 +27,7 @@ import kafka.message.UncompressedCodec
import kafka.server.{Defaults, FetchLogEnd, ReplicaManager}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool, Scheduler}
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
@ -173,7 +174,7 @@ class TransactionStateManager(brokerId: Int,
}
def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = {
responses.foreach { case (topicPartition, response) =>
responses.forKeyValue { (topicPartition, response) =>
inReadLock(stateLock) {
val toRemove = transactionalIdByPartition(topicPartition.partition)
transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry =>

View File

@ -27,6 +27,7 @@ import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.server.KafkaConfig
import kafka.utils.{Logging, NotNothing, Pool}
import kafka.utils.Implicits._
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.memory.MemoryPool
@ -394,7 +395,7 @@ class RequestChannel(val queueSize: Int,
requestQueue.take()
def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]): Unit = {
errors.foreach { case (error, count) =>
errors.forKeyValue { (error, count) =>
metrics(apiKey.name).markErrorMeter(error, count)
}
}

View File

@ -34,6 +34,7 @@ import kafka.network.SocketServer._
import kafka.security.CredentialProvider
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
@ -404,7 +405,7 @@ class SocketServer(val config: KafkaConfig,
private def waitForAuthorizerFuture(acceptor: Acceptor,
authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
//we can't rely on authorizerFutures.get() due to ephemeral ports. Get the future using listener name
authorizerFutures.foreach { case (endpoint, future) =>
authorizerFutures.forKeyValue { (endpoint, future) =>
if (endpoint.listenerName == Optional.of(acceptor.endPoint.listenerName.value))
future.join()
}

View File

@ -25,6 +25,7 @@ import kafka.security.authorizer.AclAuthorizer.{AclSeqs, ResourceOrdering, Versi
import kafka.security.authorizer.AclEntry.ResourceSeparator
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk._
import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.acl._
@ -144,7 +145,7 @@ class AclAuthorizer extends Authorizer with Logging {
override def configure(javaConfigs: util.Map[String, _]): Unit = {
val configs = javaConfigs.asScala
val props = new java.util.Properties()
configs.foreach { case (key, value) => props.put(key, value.toString) }
configs.forKeyValue { (key, value) => props.put(key, value.toString) }
superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
@ -188,28 +189,27 @@ class AclAuthorizer extends Authorizer with Logging {
override def createAcls(requestContext: AuthorizableRequestContext,
aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = {
val results = new Array[AclCreateResult](aclBindings.size)
val aclsToCreate = aclBindings.asScala.zipWithIndex
.filter { case (aclBinding, i) =>
try {
if (!extendedAclSupport && aclBinding.pattern.patternType == PatternType.PREFIXED) {
throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " +
s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater")
}
AuthorizerUtils.validateAclBinding(aclBinding)
true
} catch {
case e: Throwable =>
results(i) = new AclCreateResult(new InvalidRequestException("Failed to create ACL", apiException(e)))
false
val aclsToCreate = aclBindings.asScala.zipWithIndex.filter { case (aclBinding, i) =>
try {
if (!extendedAclSupport && aclBinding.pattern.patternType == PatternType.PREFIXED) {
throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " +
s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater")
}
}.groupBy(_._1.pattern)
AuthorizerUtils.validateAclBinding(aclBinding)
true
} catch {
case e: Throwable =>
results(i) = new AclCreateResult(new InvalidRequestException("Failed to create ACL", apiException(e)))
false
}
}.groupBy(_._1.pattern)
if (aclsToCreate.nonEmpty) {
lock synchronized {
aclsToCreate.foreach { case (resource, aclsWithIndex) =>
aclsToCreate.forKeyValue { (resource, aclsWithIndex) =>
try {
updateResourceAcls(resource) { currentAcls =>
val newAcls = aclsWithIndex.map { case (acl, index) => new AclEntry(acl.entry) }
val newAcls = aclsWithIndex.map { case (acl, _) => new AclEntry(acl.entry) }
currentAcls ++ newAcls
}
aclsWithIndex.foreach { case (_, index) => results(index) = AclCreateResult.SUCCESS }
@ -254,7 +254,7 @@ class AclAuthorizer extends Authorizer with Logging {
resource -> matchingFilters
}.toMap.filter(_._2.nonEmpty)
resourcesToUpdate.foreach { case (resource, matchingFilters) =>
resourcesToUpdate.forKeyValue { (resource, matchingFilters) =>
val resourceBindingsBeingDeleted = new mutable.HashMap[AclBinding, Int]()
try {
updateResourceAcls(resource) { currentAcls =>
@ -289,8 +289,10 @@ class AclAuthorizer extends Authorizer with Logging {
@nowarn("cat=optimizer")
override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
val aclBindings = new util.ArrayList[AclBinding]()
aclCache.foreach { case (resource, versionedAcls) =>
val aclBindings = new util.ArrayList[AclBinding]()
// Using `forKeyValue` triggers a scalac bug related to suppression of optimizer warnings, we
// should change this code once that's fixed
aclCache.foreach { case (resource, versionedAcls) =>
versionedAcls.acls.foreach { acl =>
val binding = new AclBinding(resource, acl.ace)
if (filter.matches(binding))
@ -368,7 +370,6 @@ class AclAuthorizer extends Authorizer with Logging {
}
@nowarn("cat=deprecation")
@nowarn("cat=optimizer")
private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = {
// this code is performance sensitive, make sure to run AclAuthorizerBenchmark after any changes
@ -386,7 +387,7 @@ class AclAuthorizer extends Authorizer with Logging {
aclCacheSnapshot
.from(new ResourcePattern(resourceType, resourceName, PatternType.PREFIXED))
.to(new ResourcePattern(resourceType, resourceName.take(1), PatternType.PREFIXED))
.foreach { case (resource, acls) =>
.forKeyValue { (resource, acls) =>
if (resourceName.startsWith(resource.name)) prefixed ++= acls.acls
}

View File

@ -17,6 +17,7 @@
package kafka.server
import kafka.utils.Implicits._
import kafka.utils.Logging
import kafka.cluster.BrokerEndPoint
import kafka.metrics.KafkaMetricsGroup
@ -62,7 +63,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
def resizeThreadPool(newSize: Int): Unit = {
def migratePartitions(newSize: Int): Unit = {
fetcherThreadMap.foreach { case (id, thread) =>
fetcherThreadMap.forKeyValue { (id, thread) =>
val removedPartitions = thread.partitionsAndOffsets
removeFetcherForPartitions(removedPartitions.keySet)
if (id.fetcherId >= newSize)

View File

@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.cluster.BrokerEndPoint
import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
import kafka.utils.Implicits._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.requests.EpochEndOffset._
import kafka.common.ClientIdAndBroker
@ -248,7 +249,7 @@ abstract class AbstractFetcherThread(name: String,
val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
val partitionsWithError = mutable.HashSet.empty[TopicPartition]
fetchedEpochs.foreach { case (tp, leaderEpochOffset) =>
fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
leaderEpochOffset.error match {
case Errors.NONE =>
val offsetTruncationState = getOffsetTruncationState(tp, leaderEpochOffset)
@ -316,7 +317,7 @@ abstract class AbstractFetcherThread(name: String,
if (responseData.nonEmpty) {
// process fetched data
inLock(partitionMapLock) {
responseData.foreach { case (topicPartition, partitionData) =>
responseData.forKeyValue { (topicPartition, partitionData) =>
Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
// In this case, we only want to process the fetch response if the partition state is ready for fetch and
@ -430,7 +431,7 @@ abstract class AbstractFetcherThread(name: String,
try {
failedPartitions.removeAll(initialFetchStates.keySet)
initialFetchStates.foreach { case (tp, initialFetchState) =>
initialFetchStates.forKeyValue { (tp, initialFetchState) =>
// We can skip the truncation step iff the leader epoch matches the existing epoch
val currentState = partitionStates.stateValue(tp)
val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) {

View File

@ -26,6 +26,7 @@ import kafka.utils.Log4jController
import kafka.metrics.KafkaMetricsGroup
import kafka.server.DynamicConfig.QuotaConfigs
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@ -1149,7 +1150,7 @@ class AdminManager(val config: KafkaConfig,
}
).toMap
illegalRequestsByUser.foreach { case (user, errorMessage) =>
illegalRequestsByUser.forKeyValue { (user, errorMessage) =>
retval.results.add(new AlterUserScramCredentialsResult().setUser(user)
.setErrorCode(if (errorMessage == unknownScramMechanismMsg) {Errors.UNSUPPORTED_SASL_MECHANISM.code} else {Errors.UNACCEPTABLE_CREDENTIAL.code})
.setErrorMessage(errorMessage)) }
@ -1214,7 +1215,7 @@ class AdminManager(val config: KafkaConfig,
}).collect { case (user: String, exception: Exception) => (user, exception) }.toMap
// report failures
usersFailedToPrepareProperties.++(usersFailedToPersist).foreach { case (user, exception) =>
usersFailedToPrepareProperties.++(usersFailedToPersist).forKeyValue { (user, exception) =>
val error = Errors.forException(exception)
retval.results.add(new AlterUserScramCredentialsResult()
.setUser(user)

View File

@ -26,6 +26,7 @@ import kafka.log.{LogConfig, LogManager}
import kafka.security.CredentialProvider
import kafka.server.Constants._
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigDef.Validator
import org.apache.kafka.common.config.ConfigException
@ -58,7 +59,7 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
if (logs.nonEmpty) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties()
topicConfig.asScala.foreach { case (key, value) =>
topicConfig.asScala.forKeyValue { (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
val logConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)

View File

@ -21,6 +21,7 @@ package kafka.server
import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.DeleteRecordsResponseData
import org.apache.kafka.common.protocol.Errors
@ -48,7 +49,7 @@ class DelayedDeleteRecords(delayMs: Long,
extends DelayedOperation(delayMs) {
// first update the acks pending variable according to the error code
deleteRecordsStatus.foreach { case (topicPartition, status) =>
deleteRecordsStatus.forKeyValue { (topicPartition, status) =>
if (status.responseStatus.errorCode == Errors.NONE.code) {
// Timeout error state will be cleared when required acks are received
status.acksPending = true
@ -69,7 +70,7 @@ class DelayedDeleteRecords(delayMs: Long,
*/
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
deleteRecordsStatus.foreach { case (topicPartition, status) =>
deleteRecordsStatus.forKeyValue { (topicPartition, status) =>
trace(s"Checking delete records satisfaction for $topicPartition, current status $status")
// skip those partitions that have already been satisfied
if (status.acksPending) {
@ -105,7 +106,7 @@ class DelayedDeleteRecords(delayMs: Long,
}
override def onExpiration(): Unit = {
deleteRecordsStatus.foreach { case (topicPartition, status) =>
deleteRecordsStatus.forKeyValue { (topicPartition, status) =>
if (status.acksPending) {
DelayedDeleteRecordsMetrics.recordExpiration(topicPartition)
}

View File

@ -22,6 +22,7 @@ import java.util.concurrent.locks.Lock
import com.yammer.metrics.core.Meter
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Implicits._
import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
@ -57,7 +58,7 @@ class DelayedProduce(delayMs: Long,
extends DelayedOperation(delayMs, lockOpt) {
// first update the acks pending variable according to the error code
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
if (status.responseStatus.error == Errors.NONE) {
// Timeout error state will be cleared when required acks are received
status.acksPending = true
@ -81,7 +82,7 @@ class DelayedProduce(delayMs: Long,
*/
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
trace(s"Checking produce satisfaction for $topicPartition, current status $status")
// skip those partitions that have already been satisfied
if (status.acksPending) {
@ -110,7 +111,7 @@ class DelayedProduce(delayMs: Long,
}
override def onExpiration(): Unit = {
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
if (status.acksPending) {
debug(s"Expiring produce request for partition $topicPartition with status $status")
DelayedProduceMetrics.recordExpiration(topicPartition)

View File

@ -26,6 +26,7 @@ import kafka.log.{LogCleaner, LogConfig, LogManager}
import kafka.network.SocketServer
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.{ConfigDef, ConfigException, SslConfigs, AbstractConfig}
@ -168,7 +169,7 @@ object DynamicBrokerConfig {
}
private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = {
KafkaConfig.configKeys.foreach { case (configName, config) =>
KafkaConfig.configKeys.forKeyValue { (configName, config) =>
if (AllDynamicConfigs.contains(configName)) {
configDef.define(config.name, config.`type`, config.defaultValue, config.validator,
config.importance, config.documentation, config.group, config.orderInGroup, config.width,
@ -354,7 +355,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
props.setProperty(configName, passwordEncoder.encode(new Password(value)))
}
}
configProps.asScala.foreach { case (name, value) =>
configProps.asScala.forKeyValue { (name, value) =>
if (isPasswordConfig(name))
encodePassword(name, value)
}
@ -391,7 +392,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
}
props.asScala.foreach { case (name, value) =>
props.asScala.forKeyValue { (name, value) =>
if (isPasswordConfig(name))
decodePassword(name, value)
}
@ -406,7 +407,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
val props = persistentProps.clone().asInstanceOf[Properties]
if (props.asScala.keySet.exists(isPasswordConfig)) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
persistentProps.asScala.foreach { case (configName, value) =>
persistentProps.asScala.forKeyValue { (configName, value) =>
if (isPasswordConfig(configName) && value != null) {
val decoded = try {
Some(passwordDecoder.decode(value).value)
@ -500,7 +501,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
* `props` (even though `log.roll.hours` is secondary to `log.roll.ms`).
*/
private def overrideProps(props: mutable.Map[String, String], propsOverride: mutable.Map[String, String]): Unit = {
propsOverride.foreach { case (k, v) =>
propsOverride.forKeyValue { (k, v) =>
// Remove synonyms of `k` to ensure the right precedence is applied. But disable `matchListenerOverride`
// so that base configs corresponding to listener configs are not removed. Base configs should not be removed
// since they may be used by other listeners. It is ok to retain them in `props` since base configs cannot be
@ -789,7 +790,7 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
updatedConfigs: util.Map[String, _]): Unit = {
val props = new util.HashMap[String, AnyRef]
updatedConfigs.forEach { (k, v) => props.put(k, v.asInstanceOf[AnyRef]) }
propsOverride.foreach { case (k, v) => props.put(k, v) }
propsOverride.forKeyValue { (k, v) => props.put(k, v) }
val reporters = dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses, classOf[MetricsReporter], props)
reporters.forEach { reporter =>
metrics.addReporter(reporter)

View File

@ -39,6 +39,7 @@ import kafka.network.RequestChannel
import kafka.security.authorizer.{AclEntry, AuthorizerUtils}
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@ -258,7 +259,7 @@ class KafkaApis(val requestChannel: RequestChannel,
partitionStates)
// Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
// cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
result.foreach { case (topicPartition, error) =>
result.forKeyValue { (topicPartition, error) =>
if (error == Errors.NONE) {
if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
&& partitionStates(topicPartition).deletePartition) {
@ -365,7 +366,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
if (isDebugEnabled)
combinedCommitStatus.foreach { case (topicPartition, error) =>
combinedCommitStatus.forKeyValue { (topicPartition, error) =>
if (error != Errors.NONE) {
debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
s"on partition $topicPartition failed due to ${error.exceptionName}")
@ -536,7 +537,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses
var errorInResponse = false
mergedResponseStatus.foreach { case (topicPartition, status) =>
mergedResponseStatus.forKeyValue { (topicPartition, status) =>
if (status.error != Errors.NONE) {
errorInResponse = true
debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
@ -591,7 +592,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
processingStats.foreach { case (tp, info) =>
processingStats.forKeyValue { (tp, info) =>
updateRecordConversionStats(request, tp, info)
}
}
@ -665,12 +666,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} else {
// Regular Kafka consumers need READ permission on each partition they are fetching.
val partitionMap = new mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]
val partitionDatas = new mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]
fetchContext.foreachPartition { (topicPartition, partitionData) =>
partitionMap += topicPartition -> partitionData
partitionDatas += topicPartition -> partitionData
}
val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, partitionMap)(_._1.topic)
partitionMap.foreach { case (topicPartition, data) =>
val authorizedTopics = filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic)
partitionDatas.foreach { case (topicPartition, data) =>
if (!authorizedTopics.contains(topicPartition.topic))
erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
@ -1579,7 +1580,7 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, requestThrottleMs => {
val deletionCollections = new DeletableGroupResultCollection()
groupDeletionResult.foreach { case (groupId, error) =>
groupDeletionResult.forKeyValue { (groupId, error) =>
deletionCollections.add(new DeletableGroupResult()
.setGroupId(groupId)
.setErrorCode(error.code)
@ -1772,7 +1773,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = {
errors.foreach { case (topicName, error) =>
errors.forKeyValue { (topicName, error) =>
val result = results.find(topicName)
result.setErrorCode(error.error.code)
.setErrorMessage(error.message)
@ -1951,7 +1952,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a DeleteRecordsResponse
def sendResponseCallback(authorizedTopicResponses: Map[TopicPartition, DeleteRecordsPartitionResult]): Unit = {
val mergedResponseStatus = authorizedTopicResponses ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
mergedResponseStatus.foreach { case (topicPartition, status) =>
mergedResponseStatus.forKeyValue { (topicPartition, status) =>
if (status.errorCode != Errors.NONE.code) {
debug("DeleteRecordsRequest with correlation id %d from client %s on partition %s failed due to %s".format(
request.header.correlationId,
@ -2330,7 +2331,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def sendResponseCallback(authorizedTopicErrors: Map[TopicPartition, Errors]): Unit = {
val combinedCommitStatus = mutable.Map() ++= authorizedTopicErrors ++= unauthorizedTopicErrors ++= nonExistingTopicErrors
if (isDebugEnabled)
combinedCommitStatus.foreach { case (topicPartition, error) =>
combinedCommitStatus.forKeyValue { (topicPartition, error) =>
if (error != Errors.NONE) {
debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " +
s"on partition $topicPartition failed due to ${error.exceptionName}")
@ -2889,11 +2890,11 @@ class KafkaApis(val requestChannel: RequestChannel,
val electionResults = new util.ArrayList[ReplicaElectionResult]()
adjustedResults
.groupBy { case (tp, _) => tp.topic }
.foreach { case (topic, ps) =>
.forKeyValue { (topic, ps) =>
val electionResult = new ReplicaElectionResult()
electionResult.setTopic(topic)
ps.foreach { case (topicPartition, error) =>
ps.forKeyValue { (topicPartition, error) =>
val partitionResult = new PartitionResult()
partitionResult.setPartitionId(topicPartition.partition)
partitionResult.setErrorCode(error.error.code)
@ -2969,9 +2970,9 @@ class KafkaApis(val requestChannel: RequestChannel,
offsetDeleteRequest.getErrorResponse(requestThrottleMs, groupError)
else {
val topics = new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
topicPartitionErrors.groupBy(_._1.topic).foreach { case (topic, topicPartitions) =>
topicPartitionErrors.groupBy(_._1.topic).forKeyValue { (topic, topicPartitions) =>
val partitions = new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
topicPartitions.foreach { case (topicPartition, error) =>
topicPartitions.forKeyValue { (topicPartition, error) =>
partitions.add(
new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
.setPartitionIndex(topicPartition.partition)

View File

@ -28,6 +28,7 @@ import kafka.api._
import kafka.controller.StateChangeLogger
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import kafka.utils.Implicits._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition}
@ -319,7 +320,7 @@ class MetadataCache(brokerId: Int) extends Logging {
} else {
//since kafka may do partial metadata updates, we start by copying the previous state
val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)
metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) =>
metadataSnapshot.partitionStates.forKeyValue { (topic, oldPartitionStates) =>
val copy = new mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size)
copy ++= oldPartitionStates
partitionStates(topic) = copy

View File

@ -24,6 +24,7 @@ import kafka.cluster.BrokerEndPoint
import kafka.log.{LeaderOffsetIncremented, LogAppendInfo}
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.utils.Implicits._
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
@ -252,7 +253,7 @@ class ReplicaFetcherThread(name: String,
val partitionsWithError = mutable.Set[TopicPartition]()
val builder = fetchSessionHandler.newBuilder(partitionMap.size, false)
partitionMap.foreach { case (topicPartition, fetchState) =>
partitionMap.forKeyValue { (topicPartition, fetchState) =>
// We will not include a replica in the fetch request if it should be throttled.
if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) {
try {

View File

@ -34,6 +34,7 @@ import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition}
import org.apache.kafka.common.errors._
@ -355,7 +356,7 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Handling StopReplica request correlationId $correlationId from controller " +
s"$controllerId for ${partitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled)
partitionStates.foreach { case (topicPartition, partitionState) =>
partitionStates.forKeyValue { (topicPartition, partitionState) =>
stateChangeLogger.trace(s"Received StopReplica request $partitionState " +
s"correlation id $correlationId from controller $controllerId " +
s"epoch $controllerEpoch for partition $topicPartition")
@ -372,7 +373,7 @@ class ReplicaManager(val config: KafkaConfig,
this.controllerEpoch = controllerEpoch
val stoppedPartitions = mutable.Map.empty[TopicPartition, StopReplicaPartitionState]
partitionStates.foreach { case (topicPartition, partitionState) =>
partitionStates.forKeyValue { (topicPartition, partitionState) =>
val deletePartition = partitionState.deletePartition
getPartition(topicPartition) match {
@ -427,7 +428,7 @@ class ReplicaManager(val config: KafkaConfig,
// Second remove deleted partitions from the partition map. Fetchers rely on the
// ReplicaManager to get Partition's information so they must be stopped first.
val deletedPartitions = mutable.Set.empty[TopicPartition]
stoppedPartitions.foreach { case (topicPartition, partitionState) =>
stoppedPartitions.forKeyValue { (topicPartition, partitionState) =>
if (partitionState.deletePartition) {
getPartition(topicPartition) match {
case hostedPartition@HostedPartition.Online(partition) =>
@ -1500,7 +1501,7 @@ class ReplicaManager(val config: KafkaConfig,
s"controller $controllerId epoch $controllerEpoch as part of the become-leader transition for " +
s"${partitionStates.size} partitions")
// Update the partition information to be the leader
partitionStates.foreach { case (partition, partitionState) =>
partitionStates.forKeyValue { (partition, partitionState) =>
try {
if (partition.makeLeader(partitionState, highWatermarkCheckpoints))
partitionsToMakeLeaders += partition
@ -1565,7 +1566,7 @@ class ReplicaManager(val config: KafkaConfig,
responseMap: mutable.Map[TopicPartition, Errors],
highWatermarkCheckpoints: OffsetCheckpoints) : Set[Partition] = {
val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
partitionStates.foreach { case (partition, partitionState) =>
partitionStates.forKeyValue { (partition, partitionState) =>
if (traceLoggingEnabled)
stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " +
@ -1576,7 +1577,7 @@ class ReplicaManager(val config: KafkaConfig,
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try {
// TODO: Delete leaders from LeaderAndIsrRequest
partitionStates.foreach { case (partition, partitionState) =>
partitionStates.forKeyValue { (partition, partitionState) =>
val newLeaderBrokerId = partitionState.leader
try {
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {

View File

@ -429,7 +429,7 @@ object ConsoleConsumer extends Logging {
// avoid auto-committing offsets which haven't been consumed
smallestUnconsumedOffsets.getOrElseUpdate(tp, record.offset)
}
smallestUnconsumedOffsets.foreach { case (tp, offset) => consumer.seek(tp, offset) }
smallestUnconsumedOffsets.forKeyValue { (tp, offset) => consumer.seek(tp, offset) }
}
def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = {
@ -467,7 +467,7 @@ class DefaultMessageFormatter extends MessageFormatter {
override def configure(configs: Map[String, _]): Unit = {
val props = new java.util.Properties()
configs.asScala.foreach { case (key, value) => props.put(key, value.toString) }
configs.asScala.forKeyValue { (key, value) => props.put(key, value.toString) }
if (props.containsKey("print.timestamp"))
printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
if (props.containsKey("print.key"))
@ -496,7 +496,7 @@ class DefaultMessageFormatter extends MessageFormatter {
private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = {
val newProps = new Properties()
props.asScala.foreach { case (key, value) =>
props.asScala.forKeyValue { (key, value) =>
if (key.startsWith(prefix) && key.length > prefix.length)
newProps.put(key.substring(prefix.length), value)
}

View File

@ -24,6 +24,7 @@ import kafka.coordinator.transaction.TransactionLog
import kafka.log._
import kafka.serializer.Decoder
import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
@ -68,7 +69,7 @@ object DumpLogSegments {
}
}
misMatchesForIndexFilesMap.foreach { case (fileName, listOfMismatches) =>
misMatchesForIndexFilesMap.forKeyValue { (fileName, listOfMismatches) =>
System.err.println(s"Mismatches in :$fileName")
listOfMismatches.foreach { case (indexOffset, logOffset) =>
System.err.println(s" Index offset: $indexOffset, log offset: $logOffset")
@ -77,7 +78,7 @@ object DumpLogSegments {
timeIndexDumpErrors.printErrors()
nonConsecutivePairsForLogFilesMap.foreach { case (fileName, listOfNonConsecutivePairs) =>
nonConsecutivePairsForLogFilesMap.forKeyValue { (fileName, listOfNonConsecutivePairs) =>
System.err.println(s"Non-consecutive offsets in $fileName")
listOfNonConsecutivePairs.foreach { case (first, second) =>
System.err.println(s" $first is followed by $second")

View File

@ -133,7 +133,7 @@ object GetOffsetShell {
}
}
partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) =>
partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) =>
println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}")
}

View File

@ -20,6 +20,7 @@ package kafka.utils
import java.util
import java.util.Properties
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
/**
@ -46,4 +47,21 @@ object Implicits {
}
/**
* Exposes `forKeyValue` which maps to `foreachEntry` in Scala 2.13 and `foreach` in Scala 2.12
* (with the help of scala.collection.compat). `foreachEntry` avoids the tuple allocation and
* is more efficient.
*
* This was not named `foreachEntry` to avoid `unused import` warnings in Scala 2.13 (the implicit
* would not be triggered in Scala 2.13 since `Map.foreachEntry` would have precedence).
*/
@nowarn("cat=unused-imports")
implicit class MapExtensionMethods[K, V](private val self: scala.collection.Map[K, V]) extends AnyVal {
import scala.collection.compat._
def forKeyValue[U](f: (K, V) => U): Unit = {
self.foreachEntry { (k, v) => f(k, v) }
}
}
}

View File

@ -24,6 +24,7 @@ import kafka.controller.ReplicaAssignment
import kafka.log.LogConfig
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
@ -293,7 +294,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
expectedReplicationFactor: Int,
availableBrokerIds: Set[Int]): Unit = {
replicaAssignment.foreach { case (partitionId, replicas) =>
replicaAssignment.forKeyValue { (partitionId, replicas) =>
if (replicas.isEmpty)
throw new InvalidReplicaAssignmentException(
s"Cannot have replication factor of 0 for partition id $partitionId.")

View File

@ -142,7 +142,7 @@ class TransactionsBounceTest extends IntegrationTestHarness {
}
val outputRecords = new mutable.ListBuffer[Int]()
recordsByPartition.values.foreach { case (partitionValues) =>
recordsByPartition.values.foreach { partitionValues =>
assertEquals("Out of order messages detected", partitionValues, partitionValues.sorted)
outputRecords.appendAll(partitionValues)
}

View File

@ -101,7 +101,7 @@ versions += [
powermock: "2.0.7",
reflections: "0.9.12",
rocksDB: "5.18.4",
scalaCollectionCompat: "2.1.6",
scalaCollectionCompat: "2.2.0",
scalafmt: "1.5.1",
scalaJava8Compat : "0.9.1",
scalatest: "3.0.8",

View File

@ -109,6 +109,12 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="EQ_UNUSUAL"/>
</Match>
<Match>
<!-- Suppression for the equals() for extension methods. -->
<Class name="kafka.utils.Implicits$MapExtensionMethods"/>
<Bug pattern="EQ_UNUSUAL"/>
</Match>
<Match>
<!-- Add a suppression for auto-generated calls to instanceof in kafka.utils.Json -->
<Source name="Json.scala"/>