KAFKA-12895 Drop support for Scala 2.12 in Kafka 4.0 (#17313)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2024-10-07 01:34:38 +08:00 committed by GitHub
parent d38a90df2b
commit 0e4eebe9c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
55 changed files with 175 additions and 369 deletions

View File

@ -226,7 +226,6 @@ jackson-jaxrs-json-provider-2.16.2
jackson-module-afterburner-2.16.2 jackson-module-afterburner-2.16.2
jackson-module-jaxb-annotations-2.16.2 jackson-module-jaxb-annotations-2.16.2
jackson-module-scala_2.13-2.16.2 jackson-module-scala_2.13-2.16.2
jackson-module-scala_2.12-2.16.2
jakarta.validation-api-2.0.2 jakarta.validation-api-2.0.2
javassist-3.29.2-GA javassist-3.29.2-GA
jetty-client-9.4.54.v20240208 jetty-client-9.4.54.v20240208
@ -257,15 +256,9 @@ opentelemetry-proto-1.0.0-alpha
plexus-utils-3.5.1 plexus-utils-3.5.1
reload4j-1.2.25 reload4j-1.2.25
rocksdbjni-7.9.2 rocksdbjni-7.9.2
scala-collection-compat_2.12-2.10.0
scala-collection-compat_2.13-2.10.0
scala-library-2.12.19
scala-library-2.13.15 scala-library-2.13.15
scala-logging_2.12-3.9.5
scala-logging_2.13-3.9.5 scala-logging_2.13-3.9.5
scala-reflect-2.12.19
scala-reflect-2.13.15 scala-reflect-2.13.15
scala-java8-compat_2.12-1.0.2
scala-java8-compat_2.13-1.0.2 scala-java8-compat_2.13-1.0.2
snappy-java-1.1.10.5 snappy-java-1.1.10.5
swagger-annotations-2.2.8 swagger-annotations-2.2.8

View File

@ -11,9 +11,7 @@ the broker and tools has been deprecated since Apache Kafka 3.7 and removal of b
see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) and see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) and
[KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510) for more details). [KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510) for more details).
Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12 support has been deprecated since Scala 2.13 is the only supported version in Apache Kafka.
Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see [KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)
for more details). See below for how to use a specific Scala version or all of the supported Scala versions.
### Build a jar and run it ### ### Build a jar and run it ###
./gradlew jar ./gradlew jar
@ -122,23 +120,6 @@ Using compiled files:
### Cleaning the build ### ### Cleaning the build ###
./gradlew clean ./gradlew clean
### Running a task with one of the Scala versions available (2.12.x or 2.13.x) ###
*Note that if building the jars with a version other than 2.13.x, you need to set the `SCALA_VERSION` variable or change it in `bin/kafka-run-class.sh` to run the quick start.*
You can pass either the major version (eg 2.12) or the full version (eg 2.12.7):
./gradlew -PscalaVersion=2.12 jar
./gradlew -PscalaVersion=2.12 test
./gradlew -PscalaVersion=2.12 releaseTarGz
### Running a task with all the scala versions enabled by default ###
Invoke the `gradlewAll` script followed by the task(s):
./gradlewAll test
./gradlewAll jar
./gradlewAll releaseTarGz
### Running a task for a specific project ### ### Running a task for a specific project ###
This is for `core`, `examples` and `clients` This is for `core`, `examples` and `clients`
@ -162,24 +143,6 @@ The `eclipse` task has been configured to use `${project_dir}/build_eclipse` as
build directory (`${project_dir}/bin`) clashes with Kafka's scripts directory and we don't use Gradle's build directory build directory (`${project_dir}/bin`) clashes with Kafka's scripts directory and we don't use Gradle's build directory
to avoid known issues with this configuration. to avoid known issues with this configuration.
### Publishing the jar for all versions of Scala and for all projects to maven ###
The recommended command is:
./gradlewAll publish
For backwards compatibility, the following also works:
./gradlewAll uploadArchives
Please note for this to work you should create/update `${GRADLE_USER_HOME}/gradle.properties` (typically, `~/.gradle/gradle.properties`) and assign the following variables
mavenUrl=
mavenUsername=
mavenPassword=
signing.keyId=
signing.password=
signing.secretKeyRingFile=
### Publishing the streams quickstart archetype artifact to maven ### ### Publishing the streams quickstart archetype artifact to maven ###
For the Streams archetype project, one cannot use gradle to upload to maven; instead the `mvn deploy` command needs to be called at the quickstart folder: For the Streams archetype project, one cannot use gradle to upload to maven; instead the `mvn deploy` command needs to be called at the quickstart folder:
@ -209,22 +172,10 @@ Please note for this to work you should create/update user maven settings (typic
</servers> </servers>
... ...
### Installing ALL the jars to the local Maven repository ###
The recommended command to build for both Scala 2.12 and 2.13 is:
./gradlewAll publishToMavenLocal
For backwards compatibility, the following also works:
./gradlewAll install
### Installing specific projects to the local Maven repository ### ### Installing specific projects to the local Maven repository ###
./gradlew -PskipSigning=true :streams:publishToMavenLocal ./gradlew -PskipSigning=true :streams:publishToMavenLocal
If needed, you can specify the Scala version with `-PscalaVersion=2.13`.
### Building the test jar ### ### Building the test jar ###
./gradlew testJar ./gradlew testJar

View File

@ -773,24 +773,10 @@ subprojects {
scalaCompileOptions.additionalParameters += inlineFrom scalaCompileOptions.additionalParameters += inlineFrom
} }
if (versions.baseScala != '2.12') {
scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"] scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"]
// Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings // Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"] scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
}
// these options are valid for Scala versions < 2.13 only
// Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
if (versions.baseScala == '2.12') {
scalaCompileOptions.additionalParameters += [
"-Xlint:by-name-right-associative",
"-Xlint:nullary-override",
"-Xlint:unsound-match"
]
}
// Scalac 2.12 `-release` requires Java 9 or higher, but Scala 2.13 doesn't have that restriction
if (versions.baseScala == "2.13" || JavaVersion.current().isJava9Compatible())
scalaCompileOptions.additionalParameters += ["-release", String.valueOf(minJavaVersion)] scalaCompileOptions.additionalParameters += ["-release", String.valueOf(minJavaVersion)]
addParametersForTests(name, options) addParametersForTests(name, options)
@ -1096,7 +1082,6 @@ project(':core') {
implementation libs.joptSimple implementation libs.joptSimple
implementation libs.jose4j implementation libs.jose4j
implementation libs.metrics implementation libs.metrics
implementation libs.scalaCollectionCompat
implementation libs.scalaJava8Compat implementation libs.scalaJava8Compat
// only needed transitively, but set it explicitly to ensure it has the same version as scala-library // only needed transitively, but set it explicitly to ensure it has the same version as scala-library
implementation libs.scalaReflect implementation libs.scalaReflect
@ -2813,14 +2798,6 @@ project(':streams:streams-scala') {
api project(':streams') api project(':streams')
api libs.scalaLibrary api libs.scalaLibrary
if ( versions.baseScala == '2.12' ) {
// Scala-Collection-Compat isn't required when compiling with Scala 2.13 or later,
// and having it in the dependencies could lead to classpath conflicts in Scala 3
// projects that use kafka-streams-kafka_2.13 (because we don't have a Scala 3 version yet)
// but also pull in scala-collection-compat_3 via another dependency.
// So we make sure to not include it in the dependencies.
api libs.scalaCollectionCompat
}
testImplementation project(':group-coordinator') testImplementation project(':group-coordinator')
testImplementation project(':core') testImplementation project(':core')
testImplementation project(':test-common') testImplementation project(':test-common')

View File

@ -628,7 +628,7 @@ object ConfigCommand extends Logging {
private def describeQuotaConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): Unit = { private def describeQuotaConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): Unit = {
val quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames) val quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames)
quotaConfigs.forKeyValue { (entity, entries) => quotaConfigs.foreachEntry { (entity, entries) =>
val entityEntries = entity.entries.asScala val entityEntries = entity.entries.asScala
def entitySubstr(entityType: String): Option[String] = def entitySubstr(entityType: String): Option[String] =

View File

@ -19,7 +19,6 @@ package kafka.admin
import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder} import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder}
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.Implicits._
import kafka.utils.{Logging, ToolsUtils} import kafka.utils.{Logging, ToolsUtils}
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils} import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.JaasUtils
@ -130,7 +129,7 @@ object ZkSecurityMigrator extends Logging {
// Now override any set system properties with explicitly-provided values from the config file // Now override any set system properties with explicitly-provided values from the config file
// Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make // Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make
info(s"Found ${zkTlsConfigFileProps.size()} ZooKeeper client configuration properties in file $filename") info(s"Found ${zkTlsConfigFileProps.size()} ZooKeeper client configuration properties in file $filename")
zkTlsConfigFileProps.asScala.forKeyValue { (key, value) => zkTlsConfigFileProps.asScala.foreachEntry { (key, value) =>
info(s"Setting $key") info(s"Setting $key")
KafkaConfig.setZooKeeperClientProperty(zkClientConfig, key, value) KafkaConfig.setZooKeeperClientProperty(zkClientConfig, key, value)
} }

View File

@ -19,7 +19,6 @@ package kafka.controller
import com.yammer.metrics.core.{Gauge, Timer} import com.yammer.metrics.core.{Gauge, Timer}
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.Implicits._
import kafka.utils._ import kafka.utils._
import org.apache.kafka.clients._ import org.apache.kafka.clients._
import org.apache.kafka.common._ import org.apache.kafka.common._
@ -524,11 +523,11 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
else if (metadataVersion.isAtLeast(IBP_1_0_IV0)) 1 else if (metadataVersion.isAtLeast(IBP_1_0_IV0)) 1
else 0 else 0
leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates) => leaderAndIsrRequestMap.foreachEntry { (broker, leaderAndIsrPartitionStates) =>
if (metadataInstance.liveOrShuttingDownBrokerIds.contains(broker)) { if (metadataInstance.liveOrShuttingDownBrokerIds.contains(broker)) {
val leaderIds = mutable.Set.empty[Int] val leaderIds = mutable.Set.empty[Int]
var numBecomeLeaders = 0 var numBecomeLeaders = 0
leaderAndIsrPartitionStates.forKeyValue { (topicPartition, state) => leaderAndIsrPartitionStates.foreachEntry { (topicPartition, state) =>
leaderIds += state.leader leaderIds += state.leader
val typeOfRequest = if (broker == state.leader) { val typeOfRequest = if (broker == state.leader) {
numBecomeLeaders += 1 numBecomeLeaders += 1
@ -669,10 +668,10 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
handleStopReplicaResponse(stopReplicaResponse, brokerId, partitionErrorsForDeletingTopics.toMap) handleStopReplicaResponse(stopReplicaResponse, brokerId, partitionErrorsForDeletingTopics.toMap)
} }
stopReplicaRequestMap.forKeyValue { (brokerId, partitionStates) => stopReplicaRequestMap.foreachEntry { (brokerId, partitionStates) =>
if (metadataInstance.liveOrShuttingDownBrokerIds.contains(brokerId)) { if (metadataInstance.liveOrShuttingDownBrokerIds.contains(brokerId)) {
if (traceEnabled) if (traceEnabled)
partitionStates.forKeyValue { (topicPartition, partitionState) => partitionStates.foreachEntry { (topicPartition, partitionState) =>
stateChangeLog.trace(s"Sending StopReplica request $partitionState to " + stateChangeLog.trace(s"Sending StopReplica request $partitionState to " +
s"broker $brokerId for partition $topicPartition") s"broker $brokerId for partition $topicPartition")
} }
@ -680,7 +679,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
val brokerEpoch = metadataInstance.liveBrokerIdAndEpochs(brokerId) val brokerEpoch = metadataInstance.liveBrokerIdAndEpochs(brokerId)
if (stopReplicaRequestVersion >= 3) { if (stopReplicaRequestVersion >= 3) {
val stopReplicaTopicState = mutable.Map.empty[String, StopReplicaTopicState] val stopReplicaTopicState = mutable.Map.empty[String, StopReplicaTopicState]
partitionStates.forKeyValue { (topicPartition, partitionState) => partitionStates.foreachEntry { (topicPartition, partitionState) =>
val topicState = stopReplicaTopicState.getOrElseUpdate(topicPartition.topic, val topicState = stopReplicaTopicState.getOrElseUpdate(topicPartition.topic,
new StopReplicaTopicState().setTopicName(topicPartition.topic)) new StopReplicaTopicState().setTopicName(topicPartition.topic))
topicState.partitionStates().add(partitionState) topicState.partitionStates().add(partitionState)
@ -699,7 +698,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
val topicStatesWithDelete = mutable.Map.empty[String, StopReplicaTopicState] val topicStatesWithDelete = mutable.Map.empty[String, StopReplicaTopicState]
val topicStatesWithoutDelete = mutable.Map.empty[String, StopReplicaTopicState] val topicStatesWithoutDelete = mutable.Map.empty[String, StopReplicaTopicState]
partitionStates.forKeyValue { (topicPartition, partitionState) => partitionStates.foreachEntry { (topicPartition, partitionState) =>
val topicStates = if (partitionState.deletePartition()) { val topicStates = if (partitionState.deletePartition()) {
numPartitionStateWithDelete += 1 numPartitionStateWithDelete += 1
topicStatesWithDelete topicStatesWithDelete

View File

@ -18,7 +18,6 @@
package kafka.controller package kafka.controller
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.utils.Implicits._
import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.metadata.LeaderAndIsr
@ -522,7 +521,7 @@ class ControllerContext extends ControllerChannelContext {
} }
private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = { private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
partitionAssignments.getOrElse(topic, mutable.Map.empty).forKeyValue { (partition, replicaAssignment) => partitionAssignments.getOrElse(topic, mutable.Map.empty).foreachEntry { (partition, replicaAssignment) =>
partitionLeadershipInfo.get(new TopicPartition(topic, partition)).foreach { leadershipInfo => partitionLeadershipInfo.get(new TopicPartition(topic, partition)).foreach { leadershipInfo =>
if (!hasPreferredLeader(replicaAssignment, leadershipInfo)) if (!hasPreferredLeader(replicaAssignment, leadershipInfo))
preferredReplicaImbalanceCount -= 1 preferredReplicaImbalanceCount -= 1

View File

@ -26,7 +26,6 @@ import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.server._ import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache import kafka.server.metadata.ZkFinalizedFeatureCache
import kafka.utils._ import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zk.TopicZNode.TopicIdReplicaAssignment import kafka.zk.TopicZNode.TopicIdReplicaAssignment
import kafka.zk.{FeatureZNodeStatus, _} import kafka.zk.{FeatureZNodeStatus, _}
@ -1030,7 +1029,7 @@ class KafkaController(val config: KafkaConfig,
private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq): Unit = { private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq): Unit = {
val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions) val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
leaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpochs.foreachEntry { (partition, leaderIsrAndControllerEpoch) =>
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch) controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
} }
} }
@ -1297,7 +1296,7 @@ class KafkaController(val config: KafkaConfig,
}.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head } }.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
// for each broker, check if a preferred replica election needs to be triggered // for each broker, check if a preferred replica election needs to be triggered
preferredReplicasForTopicsByBrokers.forKeyValue { (leaderBroker, topicPartitionsForBroker) => preferredReplicasForTopicsByBrokers.foreachEntry { (leaderBroker, topicPartitionsForBroker) =>
val topicsNotInPreferredReplica = topicPartitionsForBroker.filter { case (topicPartition, _) => val topicsNotInPreferredReplica = topicPartitionsForBroker.filter { case (topicPartition, _) =>
val leadershipInfo = controllerContext.partitionLeadershipInfo(topicPartition) val leadershipInfo = controllerContext.partitionLeadershipInfo(topicPartition)
leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker) leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
@ -1776,7 +1775,7 @@ class KafkaController(val config: KafkaConfig,
} }
} else if (partitionsToBeAdded.nonEmpty) { } else if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded") info(s"New partitions to be added $partitionsToBeAdded")
partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) => partitionsToBeAdded.foreachEntry { (topicPartition, assignedReplicas) =>
controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas) controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)
} }
onNewPartitionCreation(partitionsToBeAdded.keySet) onNewPartitionCreation(partitionsToBeAdded.keySet)
@ -1821,7 +1820,7 @@ class KafkaController(val config: KafkaConfig,
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]
zkClient.getPartitionReassignment.forKeyValue { (tp, targetReplicas) => zkClient.getPartitionReassignment.foreachEntry { (tp, targetReplicas) =>
maybeBuildReassignment(tp, Some(targetReplicas)) match { maybeBuildReassignment(tp, Some(targetReplicas)) match {
case Some(context) => partitionsToReassign.put(tp, context) case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
@ -1858,7 +1857,7 @@ class KafkaController(val config: KafkaConfig,
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]
reassignments.forKeyValue { (tp, targetReplicas) => reassignments.foreachEntry { (tp, targetReplicas) =>
val maybeApiError = targetReplicas.flatMap(validateReplicas(tp, _)) val maybeApiError = targetReplicas.flatMap(validateReplicas(tp, _))
maybeApiError match { maybeApiError match {
case None => case None =>
@ -2304,7 +2303,7 @@ class KafkaController(val config: KafkaConfig,
// After we have returned the result of the `AlterPartition` request, we should check whether // After we have returned the result of the `AlterPartition` request, we should check whether
// there are any reassignments which can be completed by a successful ISR expansion. // there are any reassignments which can be completed by a successful ISR expansion.
partitionResponses.forKeyValue { (topicPartition, partitionResponse) => partitionResponses.foreachEntry { (topicPartition, partitionResponse) =>
if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
val isSuccessfulUpdate = partitionResponse.isRight val isSuccessfulUpdate = partitionResponse.isRight
if (isSuccessfulUpdate) { if (isSuccessfulUpdate) {
@ -2480,7 +2479,7 @@ class KafkaController(val config: KafkaConfig,
partitionsToAlter.keySet partitionsToAlter.keySet
) )
partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName, partitionResponses) => partitionResponses.groupBy(_._1.topic).foreachEntry { (topicName, partitionResponses) =>
// Add each topic part to the response // Add each topic part to the response
val topicResponse = if (useTopicsIds) { val topicResponse = if (useTopicsIds) {
new AlterPartitionResponseData.TopicData() new AlterPartitionResponseData.TopicData()
@ -2491,7 +2490,7 @@ class KafkaController(val config: KafkaConfig,
} }
alterPartitionResponse.topics.add(topicResponse) alterPartitionResponse.topics.add(topicResponse)
partitionResponses.forKeyValue { (tp, errorOrIsr) => partitionResponses.foreachEntry { (tp, errorOrIsr) =>
// Add each partition part to the response (new ISR or error) // Add each partition part to the response (new ISR or error)
errorOrIsr match { errorOrIsr match {
case Left(error) => case Left(error) =>

View File

@ -19,7 +19,6 @@ package kafka.controller
import kafka.common.StateChangeFailedException import kafka.common.StateChangeFailedException
import kafka.controller.Election._ import kafka.controller.Election._
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.Implicits._
import kafka.utils.Logging import kafka.utils.Logging
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
@ -437,7 +436,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr( val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion) adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
finishedUpdates.forKeyValue { (partition, result) => finishedUpdates.foreachEntry { (partition, result) =>
result.foreach { leaderAndIsr => result.foreach { leaderAndIsr =>
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition) val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)

View File

@ -18,7 +18,6 @@ package kafka.controller
import kafka.common.StateChangeFailedException import kafka.common.StateChangeFailedException
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.Implicits._
import kafka.utils.Logging import kafka.utils.Logging
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
@ -110,7 +109,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
if (replicas.nonEmpty) { if (replicas.nonEmpty) {
try { try {
controllerBrokerRequestBatch.newBatch() controllerBrokerRequestBatch.newBatch()
replicas.groupBy(_.replica).forKeyValue { (replicaId, replicas) => replicas.groupBy(_.replica).foreachEntry { (replicaId, replicas) =>
doHandleStateChanges(replicaId, replicas, targetState) doHandleStateChanges(replicaId, replicas, targetState)
} }
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
@ -227,7 +226,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined
} }
val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition)) val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) => updatedLeaderIsrAndControllerEpochs.foreachEntry { (partition, leaderIsrAndControllerEpoch) =>
stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica") stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) { if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId) val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)

View File

@ -18,7 +18,6 @@ package kafka.coordinator.group
import kafka.common.OffsetAndMetadata import kafka.common.OffsetAndMetadata
import kafka.server.{KafkaConfig, ReplicaManager} import kafka.server.{KafkaConfig, ReplicaManager}
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
@ -281,7 +280,7 @@ private[group] class GroupCoordinatorAdapter(
coordinator.handleDeleteGroups( coordinator.handleDeleteGroups(
groupIds.asScala.toSet, groupIds.asScala.toSet,
new RequestLocal(bufferSupplier) new RequestLocal(bufferSupplier)
).forKeyValue { (groupId, error) => ).foreachEntry { (groupId, error) =>
results.add(new DeleteGroupsResponseData.DeletableGroupResult() results.add(new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId(groupId) .setGroupId(groupId)
.setErrorCode(error.code)) .setErrorCode(error.code))
@ -338,7 +337,7 @@ private[group] class GroupCoordinatorAdapter(
val topicsList = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]() val topicsList = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]()
val topicsMap = new mutable.HashMap[String, OffsetFetchResponseData.OffsetFetchResponseTopics]() val topicsMap = new mutable.HashMap[String, OffsetFetchResponseData.OffsetFetchResponseTopics]()
results.forKeyValue { (tp, offset) => results.foreachEntry { (tp, offset) =>
val topic = topicsMap.get(tp.topic) match { val topic = topicsMap.get(tp.topic) match {
case Some(topic) => case Some(topic) =>
topic topic
@ -378,7 +377,7 @@ private[group] class GroupCoordinatorAdapter(
val response = new OffsetCommitResponseData() val response = new OffsetCommitResponseData()
val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
commitStatus.forKeyValue { (tp, error) => commitStatus.foreachEntry { (tp, error) =>
val topic = byTopics.get(tp.topic) match { val topic = byTopics.get(tp.topic) match {
case Some(existingTopic) => case Some(existingTopic) =>
existingTopic existingTopic
@ -445,7 +444,7 @@ private[group] class GroupCoordinatorAdapter(
val response = new TxnOffsetCommitResponseData() val response = new TxnOffsetCommitResponseData()
val byTopics = new mutable.HashMap[String, TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]() val byTopics = new mutable.HashMap[String, TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]()
results.forKeyValue { (tp, error) => results.foreachEntry { (tp, error) =>
val topic = byTopics.get(tp.topic) match { val topic = byTopics.get(tp.topic) match {
case Some(existingTopic) => case Some(existingTopic) =>
existingTopic existingTopic
@ -546,7 +545,7 @@ private[group] class GroupCoordinatorAdapter(
future.completeExceptionally(groupError.exception) future.completeExceptionally(groupError.exception)
} else { } else {
val response = new OffsetDeleteResponseData() val response = new OffsetDeleteResponseData()
topicPartitionResults.forKeyValue { (topicPartition, error) => topicPartitionResults.foreachEntry { (topicPartition, error) =>
var topic = response.topics.find(topicPartition.topic) var topic = response.topics.find(topicPartition.topic)
if (topic == null) { if (topic == null) {
topic = new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topicPartition.topic) topic = new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topicPartition.topic)

View File

@ -22,7 +22,6 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.common.OffsetAndMetadata import kafka.common.OffsetAndMetadata
import kafka.utils.{CoreUtils, Logging, nonthreadsafe} import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
import kafka.utils.Implicits._
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{TopicIdPartition, TopicPartition} import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
@ -651,7 +650,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def prepareOffsetCommit(offsets: Map[TopicIdPartition, OffsetAndMetadata]): Unit = { def prepareOffsetCommit(offsets: Map[TopicIdPartition, OffsetAndMetadata]): Unit = {
receivedConsumerOffsetCommits = true receivedConsumerOffsetCommits = true
offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) => offsets.foreachEntry { (topicIdPartition, offsetAndMetadata) =>
pendingOffsetCommits += topicIdPartition.topicPartition -> offsetAndMetadata pendingOffsetCommits += topicIdPartition.topicPartition -> offsetAndMetadata
} }
} }
@ -662,7 +661,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId, val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,
mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) => offsets.foreachEntry { (topicIdPartition, offsetAndMetadata) =>
producerOffsets.put(topicIdPartition.topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata)) producerOffsets.put(topicIdPartition.topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata))
} }
} }
@ -708,7 +707,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
val pendingOffsetsOpt = pendingTransactionalOffsetCommits.remove(producerId) val pendingOffsetsOpt = pendingTransactionalOffsetCommits.remove(producerId)
if (isCommit) { if (isCommit) {
pendingOffsetsOpt.foreach { pendingOffsets => pendingOffsetsOpt.foreach { pendingOffsets =>
pendingOffsets.forKeyValue { (topicPartition, commitRecordMetadataAndOffset) => pendingOffsets.foreachEntry { (topicPartition, commitRecordMetadataAndOffset) =>
if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty) if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " + throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.") s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.")
@ -746,7 +745,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = { def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
topicPartitions.flatMap { topicPartition => topicPartitions.flatMap { topicPartition =>
pendingOffsetCommits.remove(topicPartition) pendingOffsetCommits.remove(topicPartition)
pendingTransactionalOffsetCommits.forKeyValue { (_, pendingOffsets) => pendingTransactionalOffsetCommits.foreachEntry { (_, pendingOffsets) =>
pendingOffsets.remove(topicPartition) pendingOffsets.remove(topicPartition)
} }
val removedOffset = offsets.remove(topicPartition) val removedOffset = offsets.remove(topicPartition)

View File

@ -30,7 +30,6 @@ import kafka.common.OffsetAndMetadata
import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError
import kafka.server.ReplicaManager import kafka.server.ReplicaManager
import kafka.utils.CoreUtils.inLock import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._
import kafka.utils._ import kafka.utils._
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.compress.Compression
@ -393,7 +392,7 @@ class GroupMetadataManager(brokerId: Int,
val responseError = group.inLock { val responseError = group.inLock {
if (status.error == Errors.NONE) { if (status.error == Errors.NONE) {
if (!group.is(Dead)) { if (!group.is(Dead)) {
filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => filteredOffsetMetadata.foreachEntry { (topicIdPartition, offsetAndMetadata) =>
if (isTxnOffsetCommit) if (isTxnOffsetCommit)
group.onTxnOffsetCommitAppend(producerId, topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) group.onTxnOffsetCommitAppend(producerId, topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
else else
@ -409,7 +408,7 @@ class GroupMetadataManager(brokerId: Int,
if (!group.is(Dead)) { if (!group.is(Dead)) {
if (!group.hasPendingOffsetCommitsFromProducer(producerId)) if (!group.hasPendingOffsetCommitsFromProducer(producerId))
removeProducerGroup(producerId, group.groupId) removeProducerGroup(producerId, group.groupId)
filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => filteredOffsetMetadata.foreachEntry { (topicIdPartition, offsetAndMetadata) =>
if (isTxnOffsetCommit) if (isTxnOffsetCommit)
group.failPendingTxnOffsetCommit(producerId, topicIdPartition) group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
else else
@ -705,11 +704,11 @@ class GroupMetadataManager(brokerId: Int,
}.partition { case (group, _) => loadedGroups.contains(group) } }.partition { case (group, _) => loadedGroups.contains(group) }
val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]() val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
pendingOffsets.forKeyValue { (producerId, producerOffsets) => pendingOffsets.foreachEntry { (producerId, producerOffsets) =>
producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _)) producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
producerOffsets producerOffsets
.groupBy(_._1.group) .groupBy(_._1.group)
.forKeyValue { (group, offsets) => .foreachEntry { (group, offsets) =>
val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
groupProducerOffsets ++= offsets.map { case (groupTopicPartition, offset) => groupProducerOffsets ++= offsets.map { case (groupTopicPartition, offset) =>
@ -878,7 +877,7 @@ class GroupMetadataManager(brokerId: Int,
replicaManager.onlinePartition(appendPartition).foreach { partition => replicaManager.onlinePartition(appendPartition).foreach { partition =>
val tombstones = ArrayBuffer.empty[SimpleRecord] val tombstones = ArrayBuffer.empty[SimpleRecord]
removedOffsets.forKeyValue { (topicPartition, offsetAndMetadata) => removedOffsets.foreachEntry { (topicPartition, offsetAndMetadata) =>
trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata") trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition) val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
tombstones += new SimpleRecord(timestamp, commitKey, null) tombstones += new SimpleRecord(timestamp, commitKey, null)
@ -971,7 +970,7 @@ class GroupMetadataManager(brokerId: Int,
} }
private def removeGroupFromAllProducers(groupId: String): Unit = openGroupsForProducer synchronized { private def removeGroupFromAllProducers(groupId: String): Unit = openGroupsForProducer synchronized {
openGroupsForProducer.forKeyValue { (_, groups) => openGroupsForProducer.foreachEntry { (_, groups) =>
groups.remove(groupId) groups.remove(groupId)
} }
} }

View File

@ -22,7 +22,6 @@ import kafka.coordinator.transaction.TransactionMarkerChannelManager.{LogAppendR
import java.util import java.util
import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue} import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue}
import kafka.server.{KafkaConfig, MetadataCache} import kafka.server.{KafkaConfig, MetadataCache}
import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, Logging} import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.clients._ import org.apache.kafka.clients._
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
@ -147,7 +146,7 @@ class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
} }
def forEachTxnTopicPartition[B](f:(Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]) => B): Unit = def forEachTxnTopicPartition[B](f:(Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]) => B): Unit =
markersPerTxnTopicPartition.forKeyValue { (partition, queue) => markersPerTxnTopicPartition.foreachEntry { (partition, queue) =>
if (!queue.isEmpty) f(partition, queue) if (!queue.isEmpty) f(partition, queue)
} }

View File

@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.{MetadataCache, ReplicaManager} import kafka.server.{MetadataCache, ReplicaManager}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool} import kafka.utils.{Logging, Pool}
import kafka.utils.Implicits._
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ListTransactionsResponseData import org.apache.kafka.common.message.ListTransactionsResponseData
@ -237,7 +236,7 @@ class TransactionStateManager(brokerId: Int,
private[transaction] def removeExpiredTransactionalIds(): Unit = { private[transaction] def removeExpiredTransactionalIds(): Unit = {
inReadLock(stateLock) { inReadLock(stateLock) {
transactionMetadataCache.forKeyValue { (partitionId, partitionCacheEntry) => transactionMetadataCache.foreachEntry { (partitionId, partitionCacheEntry) =>
val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
removeExpiredTransactionalIds(transactionPartition, partitionCacheEntry) removeExpiredTransactionalIds(transactionPartition, partitionCacheEntry)
} }
@ -250,7 +249,7 @@ class TransactionStateManager(brokerId: Int,
tombstoneRecords: MemoryRecords tombstoneRecords: MemoryRecords
): Unit = { ): Unit = {
def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = { def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = {
responses.forKeyValue { (topicPartition, response) => responses.foreachEntry { (topicPartition, response) =>
inReadLock(stateLock) { inReadLock(stateLock) {
transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry => transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry =>
expiredForPartition.foreach { idCoordinatorEpochAndMetadata => expiredForPartition.foreach { idCoordinatorEpochAndMetadata =>
@ -345,7 +344,7 @@ class TransactionStateManager(brokerId: Int,
} }
val states = new java.util.ArrayList[ListTransactionsResponseData.TransactionState] val states = new java.util.ArrayList[ListTransactionsResponseData.TransactionState]
transactionMetadataCache.forKeyValue { (_, cache) => transactionMetadataCache.foreachEntry { (_, cache) =>
cache.metadataPerTransactionalId.values.foreach { txnMetadata => cache.metadataPerTransactionalId.values.foreach { txnMetadata =>
txnMetadata.inLock { txnMetadata.inLock {
if (shouldInclude(txnMetadata)) { if (shouldInclude(txnMetadata)) {

View File

@ -35,7 +35,6 @@ import scala.jdk.CollectionConverters._
import scala.collection._ import scala.collection._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
import kafka.utils.Implicits._
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.requests.{AbstractControlRequest, LeaderAndIsrRequest} import org.apache.kafka.common.requests.{AbstractControlRequest, LeaderAndIsrRequest}
import org.apache.kafka.image.TopicsImage import org.apache.kafka.image.TopicsImage
@ -699,7 +698,7 @@ class LogManager(logDirs: Seq[File],
} }
try { try {
jobs.forKeyValue { (dir, dirJobs) => jobs.foreachEntry { (dir, dirJobs) =>
if (waitForAllToComplete(dirJobs, if (waitForAllToComplete(dirJobs,
e => warn(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}"))) { e => warn(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}"))) {
val logs = logsInDir(localLogsByDir, dir) val logs = logsInDir(localLogsByDir, dir)

View File

@ -24,7 +24,6 @@ import com.typesafe.scalalogging.Logger
import kafka.network import kafka.network
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.Logging import kafka.utils.Logging
import kafka.utils.Implicits._
import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.EnvelopeResponseData import org.apache.kafka.common.message.EnvelopeResponseData
@ -465,7 +464,7 @@ class RequestChannel(val queueSize: Int,
requestQueue.take() requestQueue.take()
def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]): Unit = { def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]): Unit = {
errors.forKeyValue { (error, count) => errors.foreachEntry { (error, count) =>
metrics(apiKey.name).markErrorMeter(error, count) metrics(apiKey.name).markErrorMeter(error, count)
} }
} }

View File

@ -21,7 +21,6 @@ import java.util.concurrent.{CompletableFuture, CompletionStage}
import com.typesafe.scalalogging.Logger import com.typesafe.scalalogging.Logger
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils._ import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk._ import kafka.zk._
import org.apache.kafka.common.Endpoint import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.acl._ import org.apache.kafka.common.acl._
@ -107,7 +106,7 @@ object AclAuthorizer {
// be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false // be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, forceZkSslClientEnable = true) val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, forceZkSslClientEnable = true)
// add in any prefixed overlays // add in any prefixed overlays
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.forKeyValue { (kafkaProp, sysProp) => ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.foreachEntry { (kafkaProp, sysProp) =>
configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue => configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
zkClientConfig.setProperty(sysProp, zkClientConfig.setProperty(sysProp,
if (kafkaProp == ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG) if (kafkaProp == ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
@ -187,7 +186,7 @@ class AclAuthorizer extends Authorizer with Logging {
override def configure(javaConfigs: util.Map[String, _]): Unit = { override def configure(javaConfigs: util.Map[String, _]): Unit = {
val configs = javaConfigs.asScala val configs = javaConfigs.asScala
val props = new java.util.Properties() val props = new java.util.Properties()
configs.forKeyValue { (key, value) => props.put(key, value.toString.trim) } configs.foreachEntry { (key, value) => props.put(key, value.toString.trim) }
superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect { superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
@ -251,7 +250,7 @@ class AclAuthorizer extends Authorizer with Logging {
if (aclsToCreate.nonEmpty) { if (aclsToCreate.nonEmpty) {
lock synchronized { lock synchronized {
aclsToCreate.forKeyValue { (resource, aclsWithIndex) => aclsToCreate.foreachEntry { (resource, aclsWithIndex) =>
try { try {
updateResourceAcls(resource) { currentAcls => updateResourceAcls(resource) { currentAcls =>
val newAcls = aclsWithIndex.map { case (acl, _) => new AclEntry(acl.entry) } val newAcls = aclsWithIndex.map { case (acl, _) => new AclEntry(acl.entry) }
@ -299,7 +298,7 @@ class AclAuthorizer extends Authorizer with Logging {
resource -> matchingFilters resource -> matchingFilters
}.toMap.filter(_._2.nonEmpty) }.toMap.filter(_._2.nonEmpty)
resourcesToUpdate.forKeyValue { (resource, matchingFilters) => resourcesToUpdate.foreachEntry { (resource, matchingFilters) =>
val resourceBindingsBeingDeleted = new mutable.HashMap[AclBinding, Int]() val resourceBindingsBeingDeleted = new mutable.HashMap[AclBinding, Int]()
try { try {
updateResourceAcls(resource) { currentAcls => updateResourceAcls(resource) { currentAcls =>
@ -334,7 +333,7 @@ class AclAuthorizer extends Authorizer with Logging {
override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = { override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
val aclBindings = new util.ArrayList[AclBinding]() val aclBindings = new util.ArrayList[AclBinding]()
aclCache.forKeyValue { case (resource, versionedAcls) => aclCache.foreachEntry { case (resource, versionedAcls) =>
versionedAcls.acls.foreach { acl => versionedAcls.acls.foreach { acl =>
val binding = new AclBinding(resource, acl.ace) val binding = new AclBinding(resource, acl.ace)
if (filter.matches(binding)) if (filter.matches(binding))
@ -552,7 +551,7 @@ class AclAuthorizer extends Authorizer with Logging {
aclCacheSnapshot aclCacheSnapshot
.from(new ResourcePattern(resourceType, resourceName, PatternType.PREFIXED)) .from(new ResourcePattern(resourceType, resourceName, PatternType.PREFIXED))
.to(new ResourcePattern(resourceType, resourceName.take(1), PatternType.PREFIXED)) .to(new ResourcePattern(resourceType, resourceName.take(1), PatternType.PREFIXED))
.forKeyValue { (resource, acls) => .foreachEntry { (resource, acls) =>
if (resourceName.startsWith(resource.name)) prefixed ++= acls.acls if (resourceName.startsWith(resource.name)) prefixed ++= acls.acls
} }

View File

@ -18,7 +18,6 @@
package kafka.server package kafka.server
import kafka.cluster.BrokerEndPoint import kafka.cluster.BrokerEndPoint
import kafka.utils.Implicits._
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
@ -66,11 +65,11 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
def resizeThreadPool(newSize: Int): Unit = { def resizeThreadPool(newSize: Int): Unit = {
def migratePartitions(newSize: Int): Unit = { def migratePartitions(newSize: Int): Unit = {
val allRemovedPartitionsMap = mutable.Map[TopicPartition, InitialFetchState]() val allRemovedPartitionsMap = mutable.Map[TopicPartition, InitialFetchState]()
fetcherThreadMap.forKeyValue { (id, thread) => fetcherThreadMap.foreachEntry { (id, thread) =>
val partitionStates = thread.removeAllPartitions() val partitionStates = thread.removeAllPartitions()
if (id.fetcherId >= newSize) if (id.fetcherId >= newSize)
thread.shutdown() thread.shutdown()
partitionStates.forKeyValue { (topicPartition, currentFetchState) => partitionStates.foreachEntry { (topicPartition, currentFetchState) =>
val initialFetchState = InitialFetchState(currentFetchState.topicId, thread.leader.brokerEndPoint(), val initialFetchState = InitialFetchState(currentFetchState.topicId, thread.leader.brokerEndPoint(),
currentLeaderEpoch = currentFetchState.currentLeaderEpoch, currentLeaderEpoch = currentFetchState.currentLeaderEpoch,
initOffset = currentFetchState.fetchOffset) initOffset = currentFetchState.fetchOffset)

View File

@ -20,7 +20,6 @@ package kafka.server
import com.yammer.metrics.core.Meter import com.yammer.metrics.core.Meter
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
import kafka.utils.CoreUtils.inLock import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._
import kafka.utils.{DelayedItem, Logging, Pool} import kafka.utils.{DelayedItem, Logging, Pool}
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.PartitionStates import org.apache.kafka.common.internals.PartitionStates
@ -256,7 +255,7 @@ abstract class AbstractFetcherThread(name: String,
val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
val partitionsWithError = mutable.HashSet.empty[TopicPartition] val partitionsWithError = mutable.HashSet.empty[TopicPartition]
fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) => fetchedEpochs.foreachEntry { (tp, leaderEpochOffset) =>
if (partitionStates.contains(tp)) { if (partitionStates.contains(tp)) {
Errors.forCode(leaderEpochOffset.errorCode) match { Errors.forCode(leaderEpochOffset.errorCode) match {
case Errors.NONE => case Errors.NONE =>
@ -329,7 +328,7 @@ abstract class AbstractFetcherThread(name: String,
if (responseData.nonEmpty) { if (responseData.nonEmpty) {
// process fetched data // process fetched data
inLock(partitionMapLock) { inLock(partitionMapLock) {
responseData.forKeyValue { (topicPartition, partitionData) => responseData.foreachEntry { (topicPartition, partitionData) =>
Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState => Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request. // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
// In this case, we only want to process the fetch response if the partition state is ready for fetch and // In this case, we only want to process the fetch response if the partition state is ready for fetch and
@ -508,7 +507,7 @@ abstract class AbstractFetcherThread(name: String,
try { try {
failedPartitions.removeAll(initialFetchStates.keySet) failedPartitions.removeAll(initialFetchStates.keySet)
initialFetchStates.forKeyValue { (tp, initialFetchState) => initialFetchStates.foreachEntry { (tp, initialFetchState) =>
val currentState = partitionStates.stateValue(tp) val currentState = partitionStates.stateValue(tp)
val updatedState = partitionFetchState(tp, initialFetchState, currentState) val updatedState = partitionFetchState(tp, initialFetchState, currentState)
partitionStates.updateAndMoveToEnd(tp, updatedState) partitionStates.updateAndMoveToEnd(tp, updatedState)

View File

@ -18,7 +18,6 @@
package kafka.server package kafka.server
import kafka.server.AddPartitionsToTxnManager.{VerificationFailureRateMetricName, VerificationTimeMsMetricName} import kafka.server.AddPartitionsToTxnManager.{VerificationFailureRateMetricName, VerificationTimeMsMetricName}
import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler}
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
@ -99,7 +98,7 @@ class AddPartitionsToTxnManager(
callback(topicPartitions.map(tp => tp -> Errors.COORDINATOR_NOT_AVAILABLE).toMap) callback(topicPartitions.map(tp => tp -> Errors.COORDINATOR_NOT_AVAILABLE).toMap)
} else { } else {
val topicCollection = new AddPartitionsToTxnTopicCollection() val topicCollection = new AddPartitionsToTxnTopicCollection()
topicPartitions.groupBy(_.topic).forKeyValue { (topic, tps) => topicPartitions.groupBy(_.topic).foreachEntry { (topic, tps) =>
topicCollection.add(new AddPartitionsToTxnTopic() topicCollection.add(new AddPartitionsToTxnTopic()
.setName(topic) .setName(topic)
.setPartitions(tps.map(tp => Int.box(tp.partition)).toList.asJava)) .setPartitions(tps.map(tp => Int.box(tp.partition)).toList.asJava))

View File

@ -20,7 +20,6 @@ package kafka.server
import java.lang.{Byte => JByte} import java.lang.{Byte => JByte}
import java.util.Collections import java.util.Collections
import kafka.network.RequestChannel import kafka.network.RequestChannel
import kafka.utils.CoreUtils
import org.apache.kafka.clients.admin.EndpointType import org.apache.kafka.clients.admin.EndpointType
import org.apache.kafka.common.acl.AclOperation import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.acl.AclOperation.DESCRIBE import org.apache.kafka.common.acl.AclOperation.DESCRIBE
@ -105,7 +104,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
logIfDenied: Boolean = true)(resourceName: T => String): Set[String] = { logIfDenied: Boolean = true)(resourceName: T => String): Set[String] = {
authorizer match { authorizer match {
case Some(authZ) => case Some(authZ) =>
val resourceNameToCount = CoreUtils.groupMapReduce(resources)(resourceName)(_ => 1)(_ + _) val resourceNameToCount = resources.groupMapReduce(resourceName)(_ => 1)(_ + _)
val actions = resourceNameToCount.iterator.map { case (resourceName, count) => val actions = resourceNameToCount.iterator.map { case (resourceName, count) =>
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL) val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
new Action(operation, resource, count, logIfAllowed, logIfDenied) new Action(operation, resource, count, logIfAllowed, logIfDenied)

View File

@ -24,7 +24,6 @@ import kafka.log.UnifiedLog
import kafka.network.ConnectionQuotas import kafka.network.ConnectionQuotas
import kafka.server.Constants._ import kafka.server.Constants._
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Implicits._
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs, ZooKeeperInternals} import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs, ZooKeeperInternals}
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
@ -64,7 +63,7 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
// Validate the configurations. // Validate the configurations.
val configNamesToExclude = excludedConfigs(topic, topicConfig) val configNamesToExclude = excludedConfigs(topic, topicConfig)
val props = new Properties() val props = new Properties()
topicConfig.asScala.forKeyValue { (key, value) => topicConfig.asScala.foreachEntry { (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value) if (!configNamesToExclude.contains(key)) props.put(key, value)
} }

View File

@ -20,7 +20,6 @@ package kafka.server
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.DeleteRecordsResponseData import org.apache.kafka.common.message.DeleteRecordsResponseData
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
@ -49,7 +48,7 @@ class DelayedDeleteRecords(delayMs: Long,
extends DelayedOperation(delayMs) { extends DelayedOperation(delayMs) {
// first update the acks pending variable according to the error code // first update the acks pending variable according to the error code
deleteRecordsStatus.forKeyValue { (topicPartition, status) => deleteRecordsStatus.foreachEntry { (topicPartition, status) =>
if (status.responseStatus.errorCode == Errors.NONE.code) { if (status.responseStatus.errorCode == Errors.NONE.code) {
// Timeout error state will be cleared when required acks are received // Timeout error state will be cleared when required acks are received
status.acksPending = true status.acksPending = true
@ -70,7 +69,7 @@ class DelayedDeleteRecords(delayMs: Long,
*/ */
override def tryComplete(): Boolean = { override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks // check for each partition if it still has pending acks
deleteRecordsStatus.forKeyValue { (topicPartition, status) => deleteRecordsStatus.foreachEntry { (topicPartition, status) =>
trace(s"Checking delete records satisfaction for $topicPartition, current status $status") trace(s"Checking delete records satisfaction for $topicPartition, current status $status")
// skip those partitions that have already been satisfied // skip those partitions that have already been satisfied
if (status.acksPending) { if (status.acksPending) {
@ -106,7 +105,7 @@ class DelayedDeleteRecords(delayMs: Long,
} }
override def onExpiration(): Unit = { override def onExpiration(): Unit = {
deleteRecordsStatus.forKeyValue { (topicPartition, status) => deleteRecordsStatus.foreachEntry { (topicPartition, status) =>
if (status.acksPending) { if (status.acksPending) {
DelayedDeleteRecordsMetrics.recordExpiration(topicPartition) DelayedDeleteRecordsMetrics.recordExpiration(topicPartition)
} }

View File

@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock import java.util.concurrent.locks.Lock
import com.typesafe.scalalogging.Logger import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.Meter import com.yammer.metrics.core.Meter
import kafka.utils.Implicits._
import kafka.utils.Pool import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
@ -65,7 +64,7 @@ class DelayedProduce(delayMs: Long,
override lazy val logger: Logger = DelayedProduce.logger override lazy val logger: Logger = DelayedProduce.logger
// first update the acks pending variable according to the error code // first update the acks pending variable according to the error code
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) => produceMetadata.produceStatus.foreachEntry { (topicPartition, status) =>
if (status.responseStatus.error == Errors.NONE) { if (status.responseStatus.error == Errors.NONE) {
// Timeout error state will be cleared when required acks are received // Timeout error state will be cleared when required acks are received
status.acksPending = true status.acksPending = true
@ -90,7 +89,7 @@ class DelayedProduce(delayMs: Long,
*/ */
override def tryComplete(): Boolean = { override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks // check for each partition if it still has pending acks
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) => produceMetadata.produceStatus.foreachEntry { (topicPartition, status) =>
trace(s"Checking produce satisfaction for $topicPartition, current status $status") trace(s"Checking produce satisfaction for $topicPartition, current status $status")
// skip those partitions that have already been satisfied // skip those partitions that have already been satisfied
if (status.acksPending) { if (status.acksPending) {
@ -119,7 +118,7 @@ class DelayedProduce(delayMs: Long,
} }
override def onExpiration(): Unit = { override def onExpiration(): Unit = {
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) => produceMetadata.produceStatus.foreachEntry { (topicPartition, status) =>
if (status.acksPending) { if (status.acksPending) {
debug(s"Expiring produce request for partition $topicPartition with status $status") debug(s"Expiring produce request for partition $topicPartition with status $status")
DelayedProduceMetrics.recordExpiration(topicPartition) DelayedProduceMetrics.recordExpiration(topicPartition)

View File

@ -18,7 +18,6 @@ package kafka.server
import com.yammer.metrics.core.Meter import com.yammer.metrics.core.Meter
import kafka.log.AsyncOffsetReadFutureHolder import kafka.log.AsyncOffsetReadFutureHolder
import kafka.utils.Implicits._
import kafka.utils.Pool import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ApiException import org.apache.kafka.common.errors.ApiException
@ -58,7 +57,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
// Mark the status as completed, if there is no async task to track. // Mark the status as completed, if there is no async task to track.
// If there is a task to track, then build the response as REQUEST_TIMED_OUT by default. // If there is a task to track, then build the response as REQUEST_TIMED_OUT by default.
metadata.statusByPartition.forKeyValue { (topicPartition, status) => metadata.statusByPartition.foreachEntry { (topicPartition, status) =>
status.completed = status.futureHolderOpt.isEmpty status.completed = status.futureHolderOpt.isEmpty
if (status.futureHolderOpt.isDefined) { if (status.futureHolderOpt.isDefined) {
status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition())) status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition()))
@ -70,7 +69,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
* Call-back to execute when a delayed operation gets expired and hence forced to complete. * Call-back to execute when a delayed operation gets expired and hence forced to complete.
*/ */
override def onExpiration(): Unit = { override def onExpiration(): Unit = {
metadata.statusByPartition.forKeyValue { (topicPartition, status) => metadata.statusByPartition.foreachEntry { (topicPartition, status) =>
if (!status.completed) { if (!status.completed) {
debug(s"Expiring list offset request for partition $topicPartition with status $status") debug(s"Expiring list offset request for partition $topicPartition with status $status")
status.futureHolderOpt.foreach(futureHolder => futureHolder.jobFuture.cancel(true)) status.futureHolderOpt.foreach(futureHolder => futureHolder.jobFuture.cancel(true))
@ -100,7 +99,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
*/ */
override def tryComplete(): Boolean = { override def tryComplete(): Boolean = {
var completable = true var completable = true
metadata.statusByPartition.forKeyValue { (partition, status) => metadata.statusByPartition.foreachEntry { (partition, status) =>
if (!status.completed) { if (!status.completed) {
status.futureHolderOpt.foreach { futureHolder => status.futureHolderOpt.foreach { futureHolder =>
if (futureHolder.taskFuture.isDone) { if (futureHolder.taskFuture.isDone) {

View File

@ -26,7 +26,6 @@ import kafka.log.{LogCleaner, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._ import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging} import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient} import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@ -399,7 +398,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
props.setProperty(configName, passwordEncoder.encode(new Password(value))) props.setProperty(configName, passwordEncoder.encode(new Password(value)))
} }
} }
configProps.asScala.forKeyValue { (name, value) => configProps.asScala.foreachEntry { (name, value) =>
if (isPasswordConfig(name)) if (isPasswordConfig(name))
encodePassword(name, value) encodePassword(name, value)
} }
@ -436,7 +435,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
} }
} }
props.asScala.forKeyValue { (name, value) => props.asScala.foreachEntry { (name, value) =>
if (isPasswordConfig(name)) if (isPasswordConfig(name))
decodePassword(name, value) decodePassword(name, value)
} }
@ -451,7 +450,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
val props = persistentProps.clone().asInstanceOf[Properties] val props = persistentProps.clone().asInstanceOf[Properties]
if (props.asScala.keySet.exists(isPasswordConfig)) { if (props.asScala.keySet.exists(isPasswordConfig)) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder => maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
persistentProps.asScala.forKeyValue { (configName, value) => persistentProps.asScala.foreachEntry { (configName, value) =>
if (isPasswordConfig(configName) && value != null) { if (isPasswordConfig(configName) && value != null) {
val decoded = try { val decoded = try {
Some(passwordDecoder.decode(value).value) Some(passwordDecoder.decode(value).value)
@ -545,7 +544,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
* `props` (even though `log.roll.hours` is secondary to `log.roll.ms`). * `props` (even though `log.roll.hours` is secondary to `log.roll.ms`).
*/ */
private def overrideProps(props: mutable.Map[String, String], propsOverride: mutable.Map[String, String]): Unit = { private def overrideProps(props: mutable.Map[String, String], propsOverride: mutable.Map[String, String]): Unit = {
propsOverride.forKeyValue { (k, v) => propsOverride.foreachEntry { (k, v) =>
// Remove synonyms of `k` to ensure the right precedence is applied. But disable `matchListenerOverride` // Remove synonyms of `k` to ensure the right precedence is applied. But disable `matchListenerOverride`
// so that base configs corresponding to listener configs are not removed. Base configs should not be removed // so that base configs corresponding to listener configs are not removed. Base configs should not be removed
// since they may be used by other listeners. It is ok to retain them in `props` since base configs cannot be // since they may be used by other listeners. It is ok to retain them in `props` since base configs cannot be
@ -916,7 +915,7 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me
updatedConfigs: util.Map[String, _]): Unit = { updatedConfigs: util.Map[String, _]): Unit = {
val props = new util.HashMap[String, AnyRef] val props = new util.HashMap[String, AnyRef]
updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef])) updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
propsOverride.forKeyValue((k, v) => props.put(k, v)) propsOverride.foreachEntry((k, v) => props.put(k, v))
val reporters = dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses, classOf[MetricsReporter], props) val reporters = dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses, classOf[MetricsReporter], props)
// Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange, // Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange,
@ -1058,7 +1057,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
newAdvertisedListeners: Map[ListenerName, EndPoint] newAdvertisedListeners: Map[ListenerName, EndPoint]
): Boolean = { ): Boolean = {
if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true
oldAdvertisedListeners.forKeyValue { oldAdvertisedListeners.foreachEntry {
case (oldListenerName, oldEndpoint) => case (oldListenerName, oldEndpoint) =>
newAdvertisedListeners.get(oldListenerName) match { newAdvertisedListeners.get(oldListenerName) match {
case None => return true case None => return true

View File

@ -24,7 +24,6 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache} import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
import kafka.server.share.SharePartitionManager import kafka.server.share.SharePartitionManager
import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, Logging} import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.admin.AdminUtils import org.apache.kafka.admin.AdminUtils
import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.CommonClientConfigs
@ -344,7 +343,7 @@ class KafkaApis(val requestChannel: RequestChannel,
partitionStates) partitionStates)
// Clear the coordinator caches in case we were the leader. In the case of a reassignment, we // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
// cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas. // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
result.forKeyValue { (topicPartition, error) => result.foreachEntry { (topicPartition, error) =>
if (error == Errors.NONE) { if (error == Errors.NONE) {
val partitionState = partitionStates(topicPartition) val partitionState = partitionStates(topicPartition)
if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
@ -662,7 +661,7 @@ class KafkaApis(val requestChannel: RequestChannel,
var errorInResponse = false var errorInResponse = false
val nodeEndpoints = new mutable.HashMap[Int, Node] val nodeEndpoints = new mutable.HashMap[Int, Node]
mergedResponseStatus.forKeyValue { (topicPartition, status) => mergedResponseStatus.foreachEntry { (topicPartition, status) =>
if (status.error != Errors.NONE) { if (status.error != Errors.NONE) {
errorInResponse = true errorInResponse = true
debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
@ -732,7 +731,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
def processingStatsCallback(processingStats: FetchResponseStats): Unit = { def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
processingStats.forKeyValue { (tp, info) => processingStats.foreachEntry { (tp, info) =>
updateRecordConversionStats(request, tp, info) updateRecordConversionStats(request, tp, info)
} }
} }
@ -2208,7 +2207,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a DeleteRecordsResponse // the callback for sending a DeleteRecordsResponse
def sendResponseCallback(authorizedTopicResponses: Map[TopicPartition, DeleteRecordsPartitionResult]): Unit = { def sendResponseCallback(authorizedTopicResponses: Map[TopicPartition, DeleteRecordsPartitionResult]): Unit = {
val mergedResponseStatus = authorizedTopicResponses ++ unauthorizedTopicResponses ++ nonExistingTopicResponses val mergedResponseStatus = authorizedTopicResponses ++ unauthorizedTopicResponses ++ nonExistingTopicResponses
mergedResponseStatus.forKeyValue { (topicPartition, status) => mergedResponseStatus.foreachEntry { (topicPartition, status) =>
if (status.errorCode != Errors.NONE.code) { if (status.errorCode != Errors.NONE.code) {
debug("DeleteRecordsRequest with correlation id %d from client %s on partition %s failed due to %s".format( debug("DeleteRecordsRequest with correlation id %d from client %s on partition %s failed due to %s".format(
request.header.correlationId, request.header.correlationId,
@ -2485,7 +2484,7 @@ class KafkaApis(val requestChannel: RequestChannel,
entriesPerPartition = controlRecords, entriesPerPartition = controlRecords,
requestLocal = requestLocal, requestLocal = requestLocal,
responseCallback = errors => { responseCallback = errors => {
errors.forKeyValue { (tp, partitionResponse) => errors.foreachEntry { (tp, partitionResponse) =>
markerResults.put(tp, partitionResponse.error) markerResults.put(tp, partitionResponse.error)
} }
maybeComplete() maybeComplete()
@ -3328,11 +3327,11 @@ class KafkaApis(val requestChannel: RequestChannel,
val electionResults = new util.ArrayList[ReplicaElectionResult]() val electionResults = new util.ArrayList[ReplicaElectionResult]()
adjustedResults adjustedResults
.groupBy { case (tp, _) => tp.topic } .groupBy { case (tp, _) => tp.topic }
.forKeyValue { (topic, ps) => .foreachEntry { (topic, ps) =>
val electionResult = new ReplicaElectionResult() val electionResult = new ReplicaElectionResult()
electionResult.setTopic(topic) electionResult.setTopic(topic)
ps.forKeyValue { (topicPartition, error) => ps.foreachEntry { (topicPartition, error) =>
val partitionResult = new PartitionResult() val partitionResult = new PartitionResult()
partitionResult.setPartitionId(topicPartition.partition) partitionResult.setPartitionId(topicPartition.partition)
partitionResult.setErrorCode(error.error.code) partitionResult.setErrorCode(error.error.code)

View File

@ -21,7 +21,6 @@ import kafka.cluster.BrokerEndPoint
import java.util.{Collections, Optional} import java.util.{Collections, Optional}
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.errors.KafkaStorageException
@ -141,7 +140,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
} }
val topics = new OffsetForLeaderTopicCollection(partitions.size) val topics = new OffsetForLeaderTopicCollection(partitions.size)
partitions.forKeyValue { (topicPartition, epochData) => partitions.foreachEntry { (topicPartition, epochData) =>
var topic = topics.find(topicPartition.topic) var topic = topics.find(topicPartition.topic)
if (topic == null) { if (topic == null) {
topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic) topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic)
@ -182,7 +181,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
val partitionsWithError = mutable.Set[TopicPartition]() val partitionsWithError = mutable.Set[TopicPartition]()
val builder = fetchSessionHandler.newBuilder(partitions.size, false) val builder = fetchSessionHandler.newBuilder(partitions.size, false)
partitions.forKeyValue { (topicPartition, fetchState) => partitions.foreachEntry { (topicPartition, fetchState) =>
// We will not include a replica in the fetch request if it should be throttled. // We will not include a replica in the fetch request if it should be throttled.
if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) { if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) {
try { try {

View File

@ -25,7 +25,6 @@ import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
import kafka.server.metadata.ZkMetadataCache import kafka.server.metadata.ZkMetadataCache
import kafka.utils.Implicits._
import kafka.utils._ import kafka.utils._
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
@ -436,7 +435,7 @@ class ReplicaManager(val config: KafkaConfig,
// First, stop the partitions. This will shutdown the fetchers and other managers // First, stop the partitions. This will shutdown the fetchers and other managers
val partitionsToStop = strayPartitions.map(tp => StopPartition(tp, deleteLocalLog = false)).toSet val partitionsToStop = strayPartitions.map(tp => StopPartition(tp, deleteLocalLog = false)).toSet
stopPartitions(partitionsToStop).forKeyValue { (topicPartition, exception) => stopPartitions(partitionsToStop).foreachEntry { (topicPartition, exception) =>
error(s"Unable to stop stray partition $topicPartition", exception) error(s"Unable to stop stray partition $topicPartition", exception)
} }
@ -512,7 +511,7 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Handling StopReplica request correlationId $correlationId from controller " + stateChangeLogger.info(s"Handling StopReplica request correlationId $correlationId from controller " +
s"$controllerId for ${partitionStates.size} partitions") s"$controllerId for ${partitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled) if (stateChangeLogger.isTraceEnabled)
partitionStates.forKeyValue { (topicPartition, partitionState) => partitionStates.foreachEntry { (topicPartition, partitionState) =>
stateChangeLogger.trace(s"Received StopReplica request $partitionState " + stateChangeLogger.trace(s"Received StopReplica request $partitionState " +
s"correlation id $correlationId from controller $controllerId " + s"correlation id $correlationId from controller $controllerId " +
s"epoch $controllerEpoch for partition $topicPartition") s"epoch $controllerEpoch for partition $topicPartition")
@ -529,7 +528,7 @@ class ReplicaManager(val config: KafkaConfig,
this.controllerEpoch = controllerEpoch this.controllerEpoch = controllerEpoch
val stoppedPartitions = mutable.Buffer.empty[StopPartition] val stoppedPartitions = mutable.Buffer.empty[StopPartition]
partitionStates.forKeyValue { (topicPartition, partitionState) => partitionStates.foreachEntry { (topicPartition, partitionState) =>
val deletePartition = partitionState.deletePartition() val deletePartition = partitionState.deletePartition()
getPartition(topicPartition) match { getPartition(topicPartition) match {
@ -836,7 +835,7 @@ class ReplicaManager(val config: KafkaConfig,
val transactionalProducerInfo = mutable.HashSet[(Long, Short)]() val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]() val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
entriesPerPartition.forKeyValue { (topicPartition, records) => entriesPerPartition.foreachEntry { (topicPartition, records) =>
// Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe.
val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)
transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch)) transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
@ -2203,14 +2202,14 @@ class ReplicaManager(val config: KafkaConfig,
val data = new LeaderAndIsrResponseData().setErrorCode(Errors.NONE.code) val data = new LeaderAndIsrResponseData().setErrorCode(Errors.NONE.code)
if (leaderAndIsrRequest.version < 5) { if (leaderAndIsrRequest.version < 5) {
responseMap.forKeyValue { (tp, error) => responseMap.foreachEntry { (tp, error) =>
data.partitionErrors.add(new LeaderAndIsrPartitionError() data.partitionErrors.add(new LeaderAndIsrPartitionError()
.setTopicName(tp.topic) .setTopicName(tp.topic)
.setPartitionIndex(tp.partition) .setPartitionIndex(tp.partition)
.setErrorCode(error.code)) .setErrorCode(error.code))
} }
} else { } else {
responseMap.forKeyValue { (tp, error) => responseMap.foreachEntry { (tp, error) =>
val topicId = topicIds.get(tp.topic) val topicId = topicIds.get(tp.topic)
var topic = data.topics.find(topicId) var topic = data.topics.find(topicId)
if (topic == null) { if (topic == null) {
@ -2335,7 +2334,7 @@ class ReplicaManager(val config: KafkaConfig,
s"controller $controllerId epoch $controllerEpoch as part of the become-leader transition for " + s"controller $controllerId epoch $controllerEpoch as part of the become-leader transition for " +
s"${partitionStates.size} partitions") s"${partitionStates.size} partitions")
// Update the partition information to be the leader // Update the partition information to be the leader
partitionStates.forKeyValue { (partition, partitionState) => partitionStates.foreachEntry { (partition, partitionState) =>
try { try {
if (partition.makeLeader(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) { if (partition.makeLeader(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {
partitionsToMakeLeaders += partition partitionsToMakeLeaders += partition
@ -2400,7 +2399,7 @@ class ReplicaManager(val config: KafkaConfig,
highWatermarkCheckpoints: OffsetCheckpoints, highWatermarkCheckpoints: OffsetCheckpoints,
topicIds: String => Option[Uuid]) : Set[Partition] = { topicIds: String => Option[Uuid]) : Set[Partition] = {
val traceLoggingEnabled = stateChangeLogger.isTraceEnabled val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
partitionStates.forKeyValue { (partition, partitionState) => partitionStates.foreachEntry { (partition, partitionState) =>
if (traceLoggingEnabled) if (traceLoggingEnabled)
stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " +
@ -2410,7 +2409,7 @@ class ReplicaManager(val config: KafkaConfig,
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try { try {
partitionStates.forKeyValue { (partition, partitionState) => partitionStates.foreachEntry { (partition, partitionState) =>
val newLeaderBrokerId = partitionState.leader val newLeaderBrokerId = partitionState.leader
try { try {
if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
@ -2868,7 +2867,7 @@ class ReplicaManager(val config: KafkaConfig,
} }
.toSet .toSet
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).") stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
stopPartitions(deletes).forKeyValue { (topicPartition, e) => stopPartitions(deletes).foreachEntry { (topicPartition, e) =>
if (e.isInstanceOf[KafkaStorageException]) { if (e.isInstanceOf[KafkaStorageException]) {
stateChangeLogger.error(s"Unable to delete replica $topicPartition because " + stateChangeLogger.error(s"Unable to delete replica $topicPartition because " +
"the local replica for the partition is in an offline log directory") "the local replica for the partition is in an offline log directory")
@ -2918,7 +2917,7 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Transitioning ${localLeaders.size} partition(s) to " + stateChangeLogger.info(s"Transitioning ${localLeaders.size} partition(s) to " +
"local leaders.") "local leaders.")
replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet) replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet)
localLeaders.forKeyValue { (tp, info) => localLeaders.foreachEntry { (tp, info) =>
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
try { try {
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
@ -2953,7 +2952,7 @@ class ReplicaManager(val config: KafkaConfig,
val partitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition] val partitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition]
val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean] val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean]
val followerTopicSet = new mutable.HashSet[String] val followerTopicSet = new mutable.HashSet[String]
localFollowers.forKeyValue { (tp, info) => localFollowers.foreachEntry { (tp, info) =>
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
try { try {
followerTopicSet.add(tp.topic) followerTopicSet.add(tp.topic)
@ -3006,7 +3005,7 @@ class ReplicaManager(val config: KafkaConfig,
val listenerName = config.interBrokerListenerName.value val listenerName = config.interBrokerListenerName.value
val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState] val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState]
partitionsToStartFetching.forKeyValue { (topicPartition, partition) => partitionsToStartFetching.foreachEntry { (topicPartition, partition) =>
val nodeOpt = partition.leaderReplicaIdOpt val nodeOpt = partition.leaderReplicaIdOpt
.flatMap(leaderId => Option(newImage.cluster.broker(leaderId))) .flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
.flatMap(_.node(listenerName).asScala) .flatMap(_.node(listenerName).asScala)

View File

@ -22,7 +22,6 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, toLoggableProps} import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, toLoggableProps}
import kafka.server.metadata.ZkConfigRepository import kafka.server.metadata.ZkConfigRepository
import kafka.utils._ import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient} import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.admin.AdminUtils import org.apache.kafka.admin.AdminUtils
import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism} import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
@ -963,7 +962,7 @@ class ZkAdminManager(val config: KafkaConfig,
} }
).toMap ).toMap
illegalRequestsByUser.forKeyValue { (user, errorMessage) => illegalRequestsByUser.foreachEntry { (user, errorMessage) =>
retval.results.add(new AlterUserScramCredentialsResult().setUser(user) retval.results.add(new AlterUserScramCredentialsResult().setUser(user)
.setErrorCode(if (errorMessage == unknownScramMechanismMsg) {Errors.UNSUPPORTED_SASL_MECHANISM.code} else {Errors.UNACCEPTABLE_CREDENTIAL.code}) .setErrorCode(if (errorMessage == unknownScramMechanismMsg) {Errors.UNSUPPORTED_SASL_MECHANISM.code} else {Errors.UNACCEPTABLE_CREDENTIAL.code})
.setErrorMessage(errorMessage)) } .setErrorMessage(errorMessage)) }
@ -1028,7 +1027,7 @@ class ZkAdminManager(val config: KafkaConfig,
}).collect { case (user: String, exception: Exception) => (user, exception) }.toMap }).collect { case (user: String, exception: Exception) => (user, exception) }.toMap
// report failures // report failures
usersFailedToPrepareProperties.++(usersFailedToPersist).forKeyValue { (user, exception) => usersFailedToPrepareProperties.++(usersFailedToPersist).foreachEntry { (user, exception) =>
val error = Errors.forException(exception) val error = Errors.forException(exception)
retval.results.add(new AlterUserScramCredentialsResult() retval.results.add(new AlterUserScramCredentialsResult()
.setUser(user) .setUser(user)

View File

@ -27,7 +27,6 @@ import kafka.controller.StateChangeLogger
import kafka.server.{BrokerFeatures, CachedControllerId, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId} import kafka.server.{BrokerFeatures, CachedControllerId, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId}
import kafka.utils.CoreUtils._ import kafka.utils.CoreUtils._
import kafka.utils.Logging import kafka.utils.Logging
import kafka.utils.Implicits._
import org.apache.kafka.admin.BrokerMetadata import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataPartitionState, UpdateMetadataTopicState} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataPartitionState, UpdateMetadataTopicState}
@ -74,7 +73,7 @@ object ZkMetadataCache {
val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]() val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]()
requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), state)) requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), state))
val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]() val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]()
currentMetadata.topicNames.forKeyValue((id, name) => { currentMetadata.topicNames.foreachEntry((id, name) => {
try { try {
Option(topicIdToNewState.get(id)) match { Option(topicIdToNewState.get(id)) match {
case None => case None =>
@ -560,7 +559,7 @@ class ZkMetadataCache(
} else { } else {
//since kafka may do partial metadata updates, we start by copying the previous state //since kafka may do partial metadata updates, we start by copying the previous state
val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size) val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)
metadataSnapshot.partitionStates.forKeyValue { (topic, oldPartitionStates) => metadataSnapshot.partitionStates.foreachEntry { (topic, oldPartitionStates) =>
val copy = new mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size) val copy = new mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size)
copy ++= oldPartitionStates copy ++= oldPartitionStates
partitionStates(topic) = copy partitionStates(topic) = copy

View File

@ -23,7 +23,6 @@ import java.io._
import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode} import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode}
import kafka.coordinator.transaction.TransactionLog import kafka.coordinator.transaction.TransactionLog
import kafka.log._ import kafka.log._
import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, VerifiableProperties} import kafka.utils.{CoreUtils, VerifiableProperties}
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.message.ConsumerProtocolAssignment import org.apache.kafka.common.message.ConsumerProtocolAssignment
@ -90,7 +89,7 @@ object DumpLogSegments {
} }
} }
misMatchesForIndexFilesMap.forKeyValue { (fileName, listOfMismatches) => misMatchesForIndexFilesMap.foreachEntry { (fileName, listOfMismatches) =>
System.err.println(s"Mismatches in :$fileName") System.err.println(s"Mismatches in :$fileName")
listOfMismatches.foreach { case (indexOffset, logOffset) => listOfMismatches.foreach { case (indexOffset, logOffset) =>
System.err.println(s" Index offset: $indexOffset, log offset: $logOffset") System.err.println(s" Index offset: $indexOffset, log offset: $logOffset")
@ -99,7 +98,7 @@ object DumpLogSegments {
timeIndexDumpErrors.printErrors() timeIndexDumpErrors.printErrors()
nonConsecutivePairsForLogFilesMap.forKeyValue { (fileName, listOfNonConsecutivePairs) => nonConsecutivePairsForLogFilesMap.foreachEntry { (fileName, listOfNonConsecutivePairs) =>
System.err.println(s"Non-consecutive offsets in $fileName") System.err.println(s"Non-consecutive offsets in $fileName")
listOfNonConsecutivePairs.foreach { case (first, second) => listOfNonConsecutivePairs.foreach { case (first, second) =>
System.err.println(s" $first is followed by $second") System.err.println(s" $first is followed by $second")

View File

@ -36,7 +36,6 @@ import org.apache.kafka.network.SocketServerConfigs
import org.slf4j.event.Level import org.slf4j.event.Level
import java.util import java.util
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
/** /**
@ -260,13 +259,6 @@ object CoreUtils {
} }
} }
@nowarn("cat=unused") // see below for explanation
def groupMapReduce[T, K, B](elements: Iterable[T])(key: T => K)(f: T => B)(reduce: (B, B) => B): Map[K, B] = {
// required for Scala 2.12 compatibility, unused in Scala 2.13 and hence we need to suppress the unused warning
import scala.collection.compat._
elements.groupMapReduce(key)(f)(reduce)
}
def replicaToBrokerAssignmentAsScala(map: util.Map[Integer, util.List[Integer]]): Map[Int, Seq[Int]] = { def replicaToBrokerAssignmentAsScala(map: util.Map[Integer, util.List[Integer]]): Map[Int, Seq[Int]] = {
map.asScala.map(e => (e._1.asInstanceOf[Int], e._2.asScala.map(_.asInstanceOf[Int]))) map.asScala.map(e => (e._1.asInstanceOf[Int], e._2.asScala.map(_.asInstanceOf[Int])))
} }

View File

@ -20,7 +20,6 @@ package kafka.utils
import java.util import java.util
import java.util.Properties import java.util.Properties
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
/** /**
@ -46,22 +45,4 @@ object Implicits {
(properties: util.Hashtable[AnyRef, AnyRef]).putAll(map.asJava) (properties: util.Hashtable[AnyRef, AnyRef]).putAll(map.asJava)
} }
/**
* Exposes `forKeyValue` which maps to `foreachEntry` in Scala 2.13 and `foreach` in Scala 2.12
* (with the help of scala.collection.compat). `foreachEntry` avoids the tuple allocation and
* is more efficient.
*
* This was not named `foreachEntry` to avoid `unused import` warnings in Scala 2.13 (the implicit
* would not be triggered in Scala 2.13 since `Map.foreachEntry` would have precedence).
*/
@nowarn("cat=unused-imports")
implicit class MapExtensionMethods[K, V](private val self: scala.collection.Map[K, V]) extends AnyVal {
import scala.collection.compat._
def forKeyValue[U](f: (K, V) => U): Unit = {
self.foreachEntry { (k, v) => f(k, v) }
}
}
} }

View File

@ -17,10 +17,8 @@
package kafka.utils.json package kafka.utils.json
import scala.collection.{Map, Seq} import scala.collection.{Factory, Map, Seq}
import scala.collection.compat._
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode} import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
/** /**

View File

@ -22,7 +22,6 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.controller.ReplicaAssignment import kafka.controller.ReplicaAssignment
import kafka.server.{DynamicConfig, KafkaConfig} import kafka.server.{DynamicConfig, KafkaConfig}
import kafka.utils._ import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.admin.{AdminUtils, BrokerMetadata} import org.apache.kafka.admin.{AdminUtils, BrokerMetadata}
import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
@ -311,7 +310,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
expectedReplicationFactor: Int, expectedReplicationFactor: Int,
availableBrokerIds: Set[Int]): Unit = { availableBrokerIds: Set[Int]): Unit = {
replicaAssignment.forKeyValue { (partitionId, replicas) => replicaAssignment.foreachEntry { (partitionId, replicas) =>
if (replicas.isEmpty) if (replicas.isEmpty)
throw new InvalidReplicaAssignmentException( throw new InvalidReplicaAssignmentException(
s"Cannot have replication factor of 0 for partition id $partitionId.") s"Cannot have replication factor of 0 for partition id $partitionId.")

View File

@ -79,8 +79,8 @@ import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import scala.collection.JavaConverters;
import scala.collection.Seq; import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -1004,7 +1004,6 @@ public class ConfigCommandTest {
}); });
} }
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})
public void shouldAlterTopicConfig(boolean file) { public void shouldAlterTopicConfig(boolean file) {
@ -1013,7 +1012,7 @@ public class ConfigCommandTest {
addedConfigs.put("delete.retention.ms", "1000000"); addedConfigs.put("delete.retention.ms", "1000000");
addedConfigs.put("min.insync.replicas", "2"); addedConfigs.put("min.insync.replicas", "2");
if (file) { if (file) {
File f = kafka.utils.TestUtils.tempPropertiesFile(JavaConverters.mapAsScalaMap(addedConfigs)); File f = kafka.utils.TestUtils.tempPropertiesFile(CollectionConverters.asScala(addedConfigs));
filePath = f.getPath(); filePath = f.getPath();
} }
@ -2292,8 +2291,7 @@ public class ConfigCommandTest {
} }
} }
@SuppressWarnings({"deprecation"})
private <T> Seq<T> seq(Collection<T> seq) { private <T> Seq<T> seq(Collection<T> seq) {
return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq(); return CollectionConverters.asScala(seq).toSeq();
} }
} }

View File

@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource} import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import java.util import java.util.{Properties, stream}
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.Seq import scala.collection.Seq
@ -117,38 +116,35 @@ object BaseConsumerTest {
// * KRaft and the classic group protocol // * KRaft and the classic group protocol
// * KRaft and the consumer group protocol // * KRaft and the consumer group protocol
def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = { def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = {
util.Arrays.stream(Array( stream.Stream.of(
Arguments.of("zk", "classic"), Arguments.of("zk", "classic"),
Arguments.of("kraft", "classic"), Arguments.of("kraft", "classic"),
Arguments.of("kraft", "consumer") Arguments.of("kraft", "consumer")
)) )
} }
// In Scala 2.12, it is necessary to disambiguate the java.util.stream.Stream.of() method call def getTestQuorumAndGroupProtocolParametersZkOnly() : java.util.stream.Stream[Arguments] = {
// in the case where there's only a single Arguments in the list. The following commented-out stream.Stream.of(
// method works in Scala 2.13, but not 2.12. For this reason, tests which run against just a Arguments.of("zk", "classic")
// single combination are written using @CsvSource rather than the more elegant @MethodSource. )
// def getTestQuorumAndGroupProtocolParametersZkOnly() : java.util.stream.Stream[Arguments] = { }
// java.util.stream.Stream.of(
// Arguments.of("zk", "classic"))
// }
// For tests that only work with the classic group protocol, we want to test the following combinations: // For tests that only work with the classic group protocol, we want to test the following combinations:
// * ZooKeeper and the classic group protocol // * ZooKeeper and the classic group protocol
// * KRaft and the classic group protocol // * KRaft and the classic group protocol
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() : java.util.stream.Stream[Arguments] = { def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() : java.util.stream.Stream[Arguments] = {
util.Arrays.stream(Array( stream.Stream.of(
Arguments.of("zk", "classic"), Arguments.of("zk", "classic"),
Arguments.of("kraft", "classic") Arguments.of("kraft", "classic")
)) )
} }
// For tests that only work with the consumer group protocol, we want to test the following combination: // For tests that only work with the consumer group protocol, we want to test the following combination:
// * KRaft and the consumer group protocol // * KRaft and the consumer group protocol
def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): java.util.stream.Stream[Arguments] = { def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): stream.Stream[Arguments] = {
util.Arrays.stream(Array( stream.Stream.of(
Arguments.of("kraft", "consumer") Arguments.of("kraft", "consumer")
)) )
} }
val updateProducerCount = new AtomicInteger() val updateProducerCount = new AtomicInteger()

View File

@ -25,7 +25,6 @@ import org.apache.kafka.common.PartitionInfo
import java.util.stream.Stream import java.util.stream.Stream
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.mutable import scala.collection.mutable
import org.junit.jupiter.params.provider.CsvSource
/** /**
* Integration tests for the consumer that covers logic related to manual assignment. * Integration tests for the consumer that covers logic related to manual assignment.
@ -137,9 +136,7 @@ class PlaintextConsumerAssignTest extends AbstractConsumerTest {
// partitionsFor not implemented in consumer group protocol and this test requires ZK also // partitionsFor not implemented in consumer group protocol and this test requires ZK also
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@CsvSource(Array( @MethodSource(Array("getTestQuorumAndGroupProtocolParametersZkOnly"))
"zk, classic"
))
def testAssignAndConsumeWithLeaderChangeValidatingPositions(quorum: String, groupProtocol: String): Unit = { def testAssignAndConsumeWithLeaderChangeValidatingPositions(quorum: String, groupProtocol: String): Unit = {
val numRecords = 10 val numRecords = 10
val producer = createProducer() val producer = createProducer()
@ -243,4 +240,7 @@ class PlaintextConsumerAssignTest extends AbstractConsumerTest {
object PlaintextConsumerAssignTest { object PlaintextConsumerAssignTest {
def getTestQuorumAndGroupProtocolParametersAll: Stream[Arguments] = def getTestQuorumAndGroupProtocolParametersAll: Stream[Arguments] =
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll() BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll()
def getTestQuorumAndGroupProtocolParametersZkOnly: Stream[Arguments] =
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersZkOnly()
} }

View File

@ -23,7 +23,6 @@ import java.nio.ByteBuffer
import java.util.{Collections, Optional, Properties} import java.util.{Collections, Optional, Properties}
import kafka.network.SocketServer import kafka.network.SocketServer
import kafka.security.JaasTestUtils import kafka.security.JaasTestUtils
import kafka.utils.Implicits._
import org.apache.kafka.clients.admin.{Admin, NewTopic} import org.apache.kafka.clients.admin.{Admin, NewTopic}
import org.apache.kafka.common.network.{ConnectionMode, ListenerName} import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.protocol.ApiKeys
@ -120,7 +119,7 @@ object IntegrationTestUtils {
replicaAssignment: Map[Int, Seq[Int]] replicaAssignment: Map[Int, Seq[Int]]
): Unit = { ): Unit = {
val javaAssignment = new java.util.HashMap[Integer, java.util.List[Integer]]() val javaAssignment = new java.util.HashMap[Integer, java.util.List[Integer]]()
replicaAssignment.forKeyValue { (partitionId, assignment) => replicaAssignment.foreachEntry { (partitionId, assignment) =>
javaAssignment.put(partitionId, assignment.map(Int.box).asJava) javaAssignment.put(partitionId, assignment.map(Int.box).asJava)
} }
val newTopic = new NewTopic(topic, javaAssignment) val newTopic = new NewTopic(topic, javaAssignment)

View File

@ -24,7 +24,6 @@ import java.util.{Collections, Properties}
import com.yammer.metrics.core.Meter import com.yammer.metrics.core.Meter
import kafka.network.Processor.ListenerMetricTag import kafka.network.Processor.ListenerMetricTag
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.metrics.internals.MetricsUtils import org.apache.kafka.common.metrics.internals.MetricsUtils
@ -760,7 +759,7 @@ class ConnectionQuotasTest {
"Expected broker-connection-accept-rate metric to exist") "Expected broker-connection-accept-rate metric to exist")
// add listeners and verify connection limits not exceeded // add listeners and verify connection limits not exceeded
listeners.forKeyValue { (name, listener) => listeners.foreachEntry { (name, listener) =>
val listenerName = listener.listenerName val listenerName = listener.listenerName
connectionQuotas.addListener(config, listenerName) connectionQuotas.addListener(config, listenerName)
connectionQuotas.maxConnectionsPerListener(listenerName).configure(listenerConfig) connectionQuotas.maxConnectionsPerListener(listenerName).configure(listenerConfig)
@ -785,14 +784,14 @@ class ConnectionQuotasTest {
} }
private def verifyNoBlockedPercentRecordedOnAllListeners(): Unit = { private def verifyNoBlockedPercentRecordedOnAllListeners(): Unit = {
blockedPercentMeters.forKeyValue { (name, meter) => blockedPercentMeters.foreachEntry { (name, meter) =>
assertEquals(0, meter.count(), assertEquals(0, meter.count(),
s"BlockedPercentMeter metric for $name listener") s"BlockedPercentMeter metric for $name listener")
} }
} }
private def verifyNonZeroBlockedPercentAndThrottleTimeOnAllListeners(): Unit = { private def verifyNonZeroBlockedPercentAndThrottleTimeOnAllListeners(): Unit = {
blockedPercentMeters.forKeyValue { (name, meter) => blockedPercentMeters.foreachEntry { (name, meter) =>
assertTrue(meter.count() > 0, assertTrue(meter.count() > 0,
s"Expected BlockedPercentMeter metric for $name listener to be recorded") s"Expected BlockedPercentMeter metric for $name listener to be recorded")
} }
@ -808,7 +807,7 @@ class ConnectionQuotasTest {
} }
private def verifyOnlyNonInterBrokerListenersBlockedPercentRecorded(): Unit = { private def verifyOnlyNonInterBrokerListenersBlockedPercentRecorded(): Unit = {
blockedPercentMeters.forKeyValue { (name, meter) => blockedPercentMeters.foreachEntry { (name, meter) =>
name match { name match {
case "REPLICATION" => case "REPLICATION" =>
assertEquals(0, meter.count(), s"BlockedPercentMeter metric for $name listener") assertEquals(0, meter.count(), s"BlockedPercentMeter metric for $name listener")

View File

@ -19,7 +19,6 @@ package kafka.server
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
import kafka.cluster.BrokerEndPoint import kafka.cluster.BrokerEndPoint
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.message.FetchResponseData.PartitionData import org.apache.kafka.common.message.FetchResponseData.PartitionData
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@ -255,7 +254,7 @@ class AbstractFetcherManagerTest {
fetcherManager.resizeThreadPool(newFetcherSize) fetcherManager.resizeThreadPool(newFetcherSize)
val ownedPartitions = mutable.Set.empty[TopicPartition] val ownedPartitions = mutable.Set.empty[TopicPartition]
fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, fetcherThread) => fetcherManager.fetcherThreadMap.foreachEntry { (brokerIdAndFetcherId, fetcherThread) =>
val fetcherId = brokerIdAndFetcherId.fetcherId val fetcherId = brokerIdAndFetcherId.fetcherId
val brokerId = brokerIdAndFetcherId.brokerId val brokerId = brokerIdAndFetcherId.brokerId

View File

@ -18,7 +18,6 @@
package kafka.server package kafka.server
import com.yammer.metrics.core.{Histogram, Meter} import com.yammer.metrics.core.{Histogram, Meter}
import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.clients.{ClientResponse, NetworkClient} import org.apache.kafka.clients.{ClientResponse, NetworkClient}
import org.apache.kafka.common.errors.{AuthenticationException, SaslAuthenticationException, UnsupportedVersionException} import org.apache.kafka.common.errors.{AuthenticationException, SaslAuthenticationException, UnsupportedVersionException}
@ -92,7 +91,7 @@ class AddPartitionsToTxnManagerTest {
} }
private def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { private def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = {
callbackErrors.forKeyValue(errors.put) callbackErrors.foreachEntry(errors.put)
} }
@Test @Test

View File

@ -20,7 +20,6 @@ package kafka.server
import kafka.cluster.BrokerEndPoint import kafka.cluster.BrokerEndPoint
import kafka.server.AbstractFetcherThread.ReplicaFetch import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -139,7 +138,7 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, ho
override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]() val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]()
partitions.forKeyValue { (partition, epochData) => partitions.foreachEntry { (partition, epochData) =>
assert(partition.partition == epochData.partition, assert(partition.partition == epochData.partition,
"Partition must be consistent between TopicPartition and EpochData") "Partition must be consistent between TopicPartition and EpochData")
val leaderState = leaderPartitionState(partition) val leaderState = leaderPartitionState(partition)

View File

@ -1314,7 +1314,6 @@ class ReplicaFetcherThreadTest {
when(partition.localLogOrException).thenReturn(log) when(partition.localLogOrException).thenReturn(log)
when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], any[Boolean])).thenReturn(appendInfo) when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], any[Boolean])).thenReturn(appendInfo)
// In Scala 2.12, the partitionsWithNewHighWatermark buffer is cleared before the replicaManager mock is verified.
// Capture the argument at the time of invocation. // Capture the argument at the time of invocation.
val completeDelayedFetchRequestsArgument = mutable.Buffer.empty[TopicPartition] val completeDelayedFetchRequestsArgument = mutable.Buffer.empty[TopicPartition]
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])

View File

@ -19,7 +19,6 @@ package kafka.server.epoch
import kafka.cluster.BrokerEndPoint import kafka.cluster.BrokerEndPoint
import kafka.server.KafkaConfig._ import kafka.server.KafkaConfig._
import kafka.server._ import kafka.server._
import kafka.utils.Implicits._
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils} import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@ -311,7 +310,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
val topics = new OffsetForLeaderTopicCollection(partitions.size) val topics = new OffsetForLeaderTopicCollection(partitions.size)
partitions.forKeyValue { (topicPartition, leaderEpoch) => partitions.foreachEntry { (topicPartition, leaderEpoch) =>
var topic = topics.find(topicPartition.topic) var topic = topics.find(topicPartition.topic)
if (topic == null) { if (topic == null) {
topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic) topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic)

View File

@ -421,7 +421,7 @@ object TestUtils extends Logging {
topic, numPartitions, replicationFactor.toShort).configs(configsMap))) topic, numPartitions, replicationFactor.toShort).configs(configsMap)))
} else { } else {
val assignment = new util.HashMap[Integer, util.List[Integer]]() val assignment = new util.HashMap[Integer, util.List[Integer]]()
replicaAssignment.forKeyValue { case (k, v) => replicaAssignment.foreachEntry { case (k, v) =>
val replicas = new util.ArrayList[Integer] val replicas = new util.ArrayList[Integer]
v.foreach(r => replicas.add(r.asInstanceOf[Integer])) v.foreach(r => replicas.add(r.asInstanceOf[Integer]))
assignment.put(k.asInstanceOf[Integer], replicas) assignment.put(k.asInstanceOf[Integer], replicas)

View File

@ -86,6 +86,10 @@
</li> </li>
<li>Other changes: <li>Other changes:
<ul> <ul>
<li>
Scala 2.12 support has been removed in Apache Kafka 4.0
See <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218">KIP-751</a> for more details
</li>
<li>The <code>--delete-config</code> option in the <code>kafka-topics</code> command line tool has been deprecated. <li>The <code>--delete-config</code> option in the <code>kafka-topics</code> command line tool has been deprecated.
</li> </li>
</ul> </ul>

View File

@ -20,29 +20,22 @@
ext { ext {
versions = [:] versions = [:]
libs = [:] libs = [:]
// Available if -PscalaVersion is used. This is useful when we want to support a Scala version that has
// a higher minimum Java requirement than Kafka. This was previously the case for Scala 2.12 and Java 7.
availableScalaVersions = [ '2.12', '2.13' ]
} }
// Add Scala version // Add Scala version
def defaultScala212Version = '2.12.19'
def defaultScala213Version = '2.13.15' def defaultScala213Version = '2.13.15'
if (hasProperty('scalaVersion')) { if (hasProperty('scalaVersion')) {
if (scalaVersion == '2.12') { if (scalaVersion == '2.13') {
versions["scala"] = defaultScala212Version
} else if (scalaVersion == '2.13') {
versions["scala"] = defaultScala213Version versions["scala"] = defaultScala213Version
} else { } else {
versions["scala"] = scalaVersion versions["scala"] = scalaVersion
} }
} else { } else {
versions["scala"] = defaultScala212Version versions["scala"] = defaultScala213Version
} }
/* Resolve base Scala version according to these patterns: /* Resolve base Scala version according to these patterns:
1. generally available Scala versions (such as: 2.12.y and 2.13.z) corresponding base versions will be: 2.12 and 2.13 (respectively) 1. generally available Scala versions (such as: 2.13.z) corresponding base versions will be: 2.13 (respectively)
2. pre-release Scala versions (i.e. milestone/rc, such as: 2.13.0-M5, 2.13.0-RC1, 2.14.0-M1, etc.) will have identical base versions; 2. pre-release Scala versions (i.e. milestone/rc, such as: 2.13.0-M5, 2.13.0-RC1, 2.14.0-M1, etc.) will have identical base versions;
rationale: pre-release Scala versions are not binary compatible with each other and that's the reason why libraries include the full rationale: pre-release Scala versions are not binary compatible with each other and that's the reason why libraries include the full
Scala release string in their name for pre-releases (see dependencies below with an artifact name suffix '_$versions.baseScala') Scala release string in their name for pre-releases (see dependencies below with an artifact name suffix '_$versions.baseScala')
@ -54,13 +47,9 @@ if ( !versions.scala.contains('-') ) {
} }
// mockito >= 5.5 is required for Java 21 and mockito 5.x requires at least Java 11 // mockito >= 5.5 is required for Java 21 and mockito 5.x requires at least Java 11
// mockito 4.9 is required for Scala 2.12 as a workaround for compiler errors due to ambiguous reference to `Mockito.spy`
// since Scala 2.12 support is going away soon, this is simpler than adjusting the code.
// mockito 4.11 is used with Java 8 and Scala 2.13 // mockito 4.11 is used with Java 8 and Scala 2.13
String mockitoVersion String mockitoVersion
if (scalaVersion == "2.12") if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_11))
mockitoVersion = "4.9.0"
else if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_11))
mockitoVersion = "5.10.0" mockitoVersion = "5.10.0"
else else
mockitoVersion = "4.11.0" mockitoVersion = "4.11.0"
@ -146,7 +135,6 @@ versions += [
pcollections: "4.0.1", pcollections: "4.0.1",
reload4j: "1.2.25", reload4j: "1.2.25",
rocksDB: "7.9.2", rocksDB: "7.9.2",
scalaCollectionCompat: "2.10.0",
// When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now // When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now
// has the version field as mandatory in its configuration, see // has the version field as mandatory in its configuration, see
// https://github.com/scalameta/scalafmt/releases/tag/v3.1.0. // https://github.com/scalameta/scalafmt/releases/tag/v3.1.0.
@ -246,7 +234,6 @@ libs += [
opentelemetryProto: "io.opentelemetry.proto:opentelemetry-proto:$versions.opentelemetryProto", opentelemetryProto: "io.opentelemetry.proto:opentelemetry-proto:$versions.opentelemetryProto",
reload4j: "ch.qos.reload4j:reload4j:$versions.reload4j", reload4j: "ch.qos.reload4j:reload4j:$versions.reload4j",
rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB", rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB",
scalaCollectionCompat: "org.scala-lang.modules:scala-collection-compat_$versions.baseScala:$versions.scalaCollectionCompat",
scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat", scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat",
scalaLibrary: "org.scala-lang:scala-library:$versions.scala", scalaLibrary: "org.scala-lang:scala-library:$versions.scala",
scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging", scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging",

View File

@ -184,68 +184,12 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="BX_UNBOXING_IMMEDIATELY_REBOXED"/> <Bug pattern="BX_UNBOXING_IMMEDIATELY_REBOXED"/>
</Match> </Match>
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="AdminUtils.scala"/>
<Package name="kafka.admin"/>
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match>
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="ControllerContext.scala"/>
<Package name="kafka.controller"/>
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match>
<Match> <Match>
<!-- offsets is a lazy val and it confuses spotBugs with its locking scheme --> <!-- offsets is a lazy val and it confuses spotBugs with its locking scheme -->
<Class name="kafka.server.checkpoints.LazyOffsetCheckpointMap"/> <Class name="kafka.server.checkpoints.LazyOffsetCheckpointMap"/>
<Bug pattern="IS2_INCONSISTENT_SYNC"/> <Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match> </Match>
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="ReplicaManager.scala"/>
<Package name="kafka.server"/>
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match>
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="LogManager.scala"/>
<Package name="kafka.log"/>
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match>
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="DelayedElectLeader.scala"/>
<Package name="kafka.server"/>
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match>
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="AdminZkClient.scala"/>
<Package name="kafka.zk"/>
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match>
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="AuthHelper.scala"/>
<Package name="kafka.server"/>
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match>
<Match>
<!-- Uncallable anonymous methods are left behind after inlining by scalac 2.12, fixed in 2.13 -->
<Source name="KafkaApis.scala"/>
<Package name="kafka.server"/>
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
</Match>
<Match> <Match>
<!-- Keeping this class for compatibility. It's deprecated and will be removed in the next major release --> <!-- Keeping this class for compatibility. It's deprecated and will be removed in the next major release -->
<Source name="MessageFormatter.scala"/> <Source name="MessageFormatter.scala"/>

View File

@ -17,5 +17,9 @@
# Convenient way to invoke a gradle command with all Scala versions supported # Convenient way to invoke a gradle command with all Scala versions supported
# by default # by default
./gradlew "$@" -PscalaVersion=2.12 && ./gradlew "$@" -PscalaVersion=2.13 # This script was originally designed to support multiple Scala versions (2.12 and 2.13),
# but as Scala 2.12 is no longer supported, this script is no longer necessary.
# We are keeping it for backwards compatibility. It will be removed in a future release.
echo "Warning: This script is deprecated and will be removed in a future release."
./gradlew "$@" -PscalaVersion=2.13

View File

@ -189,4 +189,23 @@ public class ConcurrentMapBenchmark {
} }
} }
} }
@Benchmark
@OperationsPerInvocation(TIMES)
public void testConcurrentHashMapComputeIfAbsentReadOnly(Blackhole blackhole) {
for (int i = 0; i < TIMES; i++) {
blackhole.consume(concurrentHashMap.computeIfAbsent(
ThreadLocalRandom.current().nextInt(0, mapSize),
newValue -> Integer.MAX_VALUE
));
}
}
@Benchmark
@OperationsPerInvocation(TIMES)
public void testConcurrentHashMapGetReadOnly(Blackhole blackhole) {
for (int i = 0; i < TIMES; i++) {
blackhole.consume(concurrentHashMap.get(ThreadLocalRandom.current().nextInt(0, mapSize)));
}
}
} }

View File

@ -326,7 +326,7 @@ except Exception as e:
git.targz(rc_tag, f"kafka-{release_version}-src/", f"{artifacts_dir}/kafka-{release_version}-src.tgz") git.targz(rc_tag, f"kafka-{release_version}-src/", f"{artifacts_dir}/kafka-{release_version}-src.tgz")
cmd("Building artifacts", "./gradlew clean && ./gradlewAll releaseTarGz", cwd=kafka_dir, env=jdk8_env, shell=True) cmd("Building artifacts", "./gradlew clean && ./gradlew releaseTarGz -PscalaVersion=2.13", cwd=kafka_dir, env=jdk8_env, shell=True)
cmd("Copying artifacts", f"cp {kafka_dir}/core/build/distributions/* {artifacts_dir}", shell=True) cmd("Copying artifacts", f"cp {kafka_dir}/core/build/distributions/* {artifacts_dir}", shell=True)
cmd("Building docs", "./gradlew clean aggregatedJavadoc", cwd=kafka_dir, env=jdk17_env) cmd("Building docs", "./gradlew clean aggregatedJavadoc", cwd=kafka_dir, env=jdk17_env)
cmd("Copying docs", f"cp -R {kafka_dir}/build/docs/javadoc {artifacts_dir}") cmd("Copying docs", f"cp -R {kafka_dir}/build/docs/javadoc {artifacts_dir}")
@ -351,7 +351,7 @@ cmd("Zipping artifacts", f"tar -czf {artifact_name}.tar.gz {artifact_name}", cwd
sftp.upload_artifacts(apache_id, artifacts_dir) sftp.upload_artifacts(apache_id, artifacts_dir)
confirm_or_fail("Going to build and upload mvn artifacts based on these settings:\n" + textfiles.read(global_gradle_props) + '\nOK?') confirm_or_fail("Going to build and upload mvn artifacts based on these settings:\n" + textfiles.read(global_gradle_props) + '\nOK?')
cmd("Building and uploading archives", "./gradlewAll publish", cwd=kafka_dir, env=jdk8_env, shell=True) cmd("Building and uploading archives", "./gradlew publish -PscalaVersion=2.13", cwd=kafka_dir, env=jdk8_env, shell=True)
cmd("Building and uploading archives", "mvn deploy -Pgpg-signing", cwd=os.path.join(kafka_dir, "streams/quickstart"), env=jdk8_env, shell=True) cmd("Building and uploading archives", "mvn deploy -Pgpg-signing", cwd=os.path.join(kafka_dir, "streams/quickstart"), env=jdk8_env, shell=True)
# TODO: Many of these suggested validation steps could be automated # TODO: Many of these suggested validation steps could be automated

View File

@ -77,7 +77,6 @@ import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LIS
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIRS_CONFIG; import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIRS_CONFIG;
@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
public class KafkaClusterTestKit implements AutoCloseable { public class KafkaClusterTestKit implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class); private static final Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);