diff --git a/LICENSE-binary b/LICENSE-binary index 611909cba67..d067f3ffb52 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -226,7 +226,6 @@ jackson-jaxrs-json-provider-2.16.2 jackson-module-afterburner-2.16.2 jackson-module-jaxb-annotations-2.16.2 jackson-module-scala_2.13-2.16.2 -jackson-module-scala_2.12-2.16.2 jakarta.validation-api-2.0.2 javassist-3.29.2-GA jetty-client-9.4.54.v20240208 @@ -257,15 +256,9 @@ opentelemetry-proto-1.0.0-alpha plexus-utils-3.5.1 reload4j-1.2.25 rocksdbjni-7.9.2 -scala-collection-compat_2.12-2.10.0 -scala-collection-compat_2.13-2.10.0 -scala-library-2.12.19 scala-library-2.13.15 -scala-logging_2.12-3.9.5 scala-logging_2.13-3.9.5 -scala-reflect-2.12.19 scala-reflect-2.13.15 -scala-java8-compat_2.12-1.0.2 scala-java8-compat_2.13-1.0.2 snappy-java-1.1.10.5 swagger-annotations-2.2.8 diff --git a/README.md b/README.md index 6e90cac5c52..0be7bff7afa 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,7 @@ the broker and tools has been deprecated since Apache Kafka 3.7 and removal of b see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) and [KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510) for more details). -Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12 support has been deprecated since -Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see [KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218) -for more details). See below for how to use a specific Scala version or all of the supported Scala versions. +Scala 2.13 is the only supported version in Apache Kafka. ### Build a jar and run it ### ./gradlew jar @@ -122,23 +120,6 @@ Using compiled files: ### Cleaning the build ### ./gradlew clean -### Running a task with one of the Scala versions available (2.12.x or 2.13.x) ### -*Note that if building the jars with a version other than 2.13.x, you need to set the `SCALA_VERSION` variable or change it in `bin/kafka-run-class.sh` to run the quick start.* - -You can pass either the major version (eg 2.12) or the full version (eg 2.12.7): - - ./gradlew -PscalaVersion=2.12 jar - ./gradlew -PscalaVersion=2.12 test - ./gradlew -PscalaVersion=2.12 releaseTarGz - -### Running a task with all the scala versions enabled by default ### - -Invoke the `gradlewAll` script followed by the task(s): - - ./gradlewAll test - ./gradlewAll jar - ./gradlewAll releaseTarGz - ### Running a task for a specific project ### This is for `core`, `examples` and `clients` @@ -162,24 +143,6 @@ The `eclipse` task has been configured to use `${project_dir}/build_eclipse` as build directory (`${project_dir}/bin`) clashes with Kafka's scripts directory and we don't use Gradle's build directory to avoid known issues with this configuration. -### Publishing the jar for all versions of Scala and for all projects to maven ### -The recommended command is: - - ./gradlewAll publish - -For backwards compatibility, the following also works: - - ./gradlewAll uploadArchives - -Please note for this to work you should create/update `${GRADLE_USER_HOME}/gradle.properties` (typically, `~/.gradle/gradle.properties`) and assign the following variables - - mavenUrl= - mavenUsername= - mavenPassword= - signing.keyId= - signing.password= - signing.secretKeyRingFile= - ### Publishing the streams quickstart archetype artifact to maven ### For the Streams archetype project, one cannot use gradle to upload to maven; instead the `mvn deploy` command needs to be called at the quickstart folder: @@ -209,22 +172,10 @@ Please note for this to work you should create/update user maven settings (typic ... - -### Installing ALL the jars to the local Maven repository ### -The recommended command to build for both Scala 2.12 and 2.13 is: - - ./gradlewAll publishToMavenLocal - -For backwards compatibility, the following also works: - - ./gradlewAll install - ### Installing specific projects to the local Maven repository ### ./gradlew -PskipSigning=true :streams:publishToMavenLocal -If needed, you can specify the Scala version with `-PscalaVersion=2.13`. - ### Building the test jar ### ./gradlew testJar diff --git a/build.gradle b/build.gradle index ec55a370a7b..242143794e4 100644 --- a/build.gradle +++ b/build.gradle @@ -773,25 +773,11 @@ subprojects { scalaCompileOptions.additionalParameters += inlineFrom } - if (versions.baseScala != '2.12') { - scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"] - // Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings - scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"] - } + scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"] + // Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings + scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"] - // these options are valid for Scala versions < 2.13 only - // Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969 - if (versions.baseScala == '2.12') { - scalaCompileOptions.additionalParameters += [ - "-Xlint:by-name-right-associative", - "-Xlint:nullary-override", - "-Xlint:unsound-match" - ] - } - - // Scalac 2.12 `-release` requires Java 9 or higher, but Scala 2.13 doesn't have that restriction - if (versions.baseScala == "2.13" || JavaVersion.current().isJava9Compatible()) - scalaCompileOptions.additionalParameters += ["-release", String.valueOf(minJavaVersion)] + scalaCompileOptions.additionalParameters += ["-release", String.valueOf(minJavaVersion)] addParametersForTests(name, options) @@ -1096,7 +1082,6 @@ project(':core') { implementation libs.joptSimple implementation libs.jose4j implementation libs.metrics - implementation libs.scalaCollectionCompat implementation libs.scalaJava8Compat // only needed transitively, but set it explicitly to ensure it has the same version as scala-library implementation libs.scalaReflect @@ -2813,14 +2798,6 @@ project(':streams:streams-scala') { api project(':streams') api libs.scalaLibrary - if ( versions.baseScala == '2.12' ) { - // Scala-Collection-Compat isn't required when compiling with Scala 2.13 or later, - // and having it in the dependencies could lead to classpath conflicts in Scala 3 - // projects that use kafka-streams-kafka_2.13 (because we don't have a Scala 3 version yet) - // but also pull in scala-collection-compat_3 via another dependency. - // So we make sure to not include it in the dependencies. - api libs.scalaCollectionCompat - } testImplementation project(':group-coordinator') testImplementation project(':core') testImplementation project(':test-common') diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 6602d879759..4722311f6c0 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -628,7 +628,7 @@ object ConfigCommand extends Logging { private def describeQuotaConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): Unit = { val quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames) - quotaConfigs.forKeyValue { (entity, entries) => + quotaConfigs.foreachEntry { (entity, entries) => val entityEntries = entity.entries.asScala def entitySubstr(entityType: String): Option[String] = diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 33fec627486..77662c3b114 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -19,7 +19,6 @@ package kafka.admin import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder} import kafka.server.KafkaConfig -import kafka.utils.Implicits._ import kafka.utils.{Logging, ToolsUtils} import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils} import org.apache.kafka.common.security.JaasUtils @@ -130,7 +129,7 @@ object ZkSecurityMigrator extends Logging { // Now override any set system properties with explicitly-provided values from the config file // Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make info(s"Found ${zkTlsConfigFileProps.size()} ZooKeeper client configuration properties in file $filename") - zkTlsConfigFileProps.asScala.forKeyValue { (key, value) => + zkTlsConfigFileProps.asScala.foreachEntry { (key, value) => info(s"Setting $key") KafkaConfig.setZooKeeperClientProperty(zkClientConfig, key, value) } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 27ca3c5e02e..cea7368378d 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -19,7 +19,6 @@ package kafka.controller import com.yammer.metrics.core.{Gauge, Timer} import kafka.cluster.Broker import kafka.server.KafkaConfig -import kafka.utils.Implicits._ import kafka.utils._ import org.apache.kafka.clients._ import org.apache.kafka.common._ @@ -524,11 +523,11 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, else if (metadataVersion.isAtLeast(IBP_1_0_IV0)) 1 else 0 - leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates) => + leaderAndIsrRequestMap.foreachEntry { (broker, leaderAndIsrPartitionStates) => if (metadataInstance.liveOrShuttingDownBrokerIds.contains(broker)) { val leaderIds = mutable.Set.empty[Int] var numBecomeLeaders = 0 - leaderAndIsrPartitionStates.forKeyValue { (topicPartition, state) => + leaderAndIsrPartitionStates.foreachEntry { (topicPartition, state) => leaderIds += state.leader val typeOfRequest = if (broker == state.leader) { numBecomeLeaders += 1 @@ -669,10 +668,10 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, handleStopReplicaResponse(stopReplicaResponse, brokerId, partitionErrorsForDeletingTopics.toMap) } - stopReplicaRequestMap.forKeyValue { (brokerId, partitionStates) => + stopReplicaRequestMap.foreachEntry { (brokerId, partitionStates) => if (metadataInstance.liveOrShuttingDownBrokerIds.contains(brokerId)) { if (traceEnabled) - partitionStates.forKeyValue { (topicPartition, partitionState) => + partitionStates.foreachEntry { (topicPartition, partitionState) => stateChangeLog.trace(s"Sending StopReplica request $partitionState to " + s"broker $brokerId for partition $topicPartition") } @@ -680,7 +679,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, val brokerEpoch = metadataInstance.liveBrokerIdAndEpochs(brokerId) if (stopReplicaRequestVersion >= 3) { val stopReplicaTopicState = mutable.Map.empty[String, StopReplicaTopicState] - partitionStates.forKeyValue { (topicPartition, partitionState) => + partitionStates.foreachEntry { (topicPartition, partitionState) => val topicState = stopReplicaTopicState.getOrElseUpdate(topicPartition.topic, new StopReplicaTopicState().setTopicName(topicPartition.topic)) topicState.partitionStates().add(partitionState) @@ -699,7 +698,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, val topicStatesWithDelete = mutable.Map.empty[String, StopReplicaTopicState] val topicStatesWithoutDelete = mutable.Map.empty[String, StopReplicaTopicState] - partitionStates.forKeyValue { (topicPartition, partitionState) => + partitionStates.foreachEntry { (topicPartition, partitionState) => val topicStates = if (partitionState.deletePartition()) { numPartitionStateWithDelete += 1 topicStatesWithDelete diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 2df4e0f78f0..10420716416 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -18,7 +18,6 @@ package kafka.controller import kafka.cluster.Broker -import kafka.utils.Implicits._ import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderAndIsr @@ -522,7 +521,7 @@ class ControllerContext extends ControllerChannelContext { } private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = { - partitionAssignments.getOrElse(topic, mutable.Map.empty).forKeyValue { (partition, replicaAssignment) => + partitionAssignments.getOrElse(topic, mutable.Map.empty).foreachEntry { (partition, replicaAssignment) => partitionLeadershipInfo.get(new TopicPartition(topic, partition)).foreach { leadershipInfo => if (!hasPreferredLeader(replicaAssignment, leadershipInfo)) preferredReplicaImbalanceCount -= 1 diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index bd4df02ca3b..0f9d65ebb1c 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -26,7 +26,6 @@ import kafka.coordinator.transaction.ZkProducerIdManager import kafka.server._ import kafka.server.metadata.ZkFinalizedFeatureCache import kafka.utils._ -import kafka.utils.Implicits._ import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zk.TopicZNode.TopicIdReplicaAssignment import kafka.zk.{FeatureZNodeStatus, _} @@ -1030,7 +1029,7 @@ class KafkaController(val config: KafkaConfig, private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq): Unit = { val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions) - leaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) => + leaderIsrAndControllerEpochs.foreachEntry { (partition, leaderIsrAndControllerEpoch) => controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch) } } @@ -1297,7 +1296,7 @@ class KafkaController(val config: KafkaConfig, }.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head } // for each broker, check if a preferred replica election needs to be triggered - preferredReplicasForTopicsByBrokers.forKeyValue { (leaderBroker, topicPartitionsForBroker) => + preferredReplicasForTopicsByBrokers.foreachEntry { (leaderBroker, topicPartitionsForBroker) => val topicsNotInPreferredReplica = topicPartitionsForBroker.filter { case (topicPartition, _) => val leadershipInfo = controllerContext.partitionLeadershipInfo(topicPartition) leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker) @@ -1776,7 +1775,7 @@ class KafkaController(val config: KafkaConfig, } } else if (partitionsToBeAdded.nonEmpty) { info(s"New partitions to be added $partitionsToBeAdded") - partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) => + partitionsToBeAdded.foreachEntry { (topicPartition, assignedReplicas) => controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas) } onNewPartitionCreation(partitionsToBeAdded.keySet) @@ -1821,7 +1820,7 @@ class KafkaController(val config: KafkaConfig, val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] - zkClient.getPartitionReassignment.forKeyValue { (tp, targetReplicas) => + zkClient.getPartitionReassignment.foreachEntry { (tp, targetReplicas) => maybeBuildReassignment(tp, Some(targetReplicas)) match { case Some(context) => partitionsToReassign.put(tp, context) case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) @@ -1858,7 +1857,7 @@ class KafkaController(val config: KafkaConfig, val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] - reassignments.forKeyValue { (tp, targetReplicas) => + reassignments.foreachEntry { (tp, targetReplicas) => val maybeApiError = targetReplicas.flatMap(validateReplicas(tp, _)) maybeApiError match { case None => @@ -2304,7 +2303,7 @@ class KafkaController(val config: KafkaConfig, // After we have returned the result of the `AlterPartition` request, we should check whether // there are any reassignments which can be completed by a successful ISR expansion. - partitionResponses.forKeyValue { (topicPartition, partitionResponse) => + partitionResponses.foreachEntry { (topicPartition, partitionResponse) => if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) { val isSuccessfulUpdate = partitionResponse.isRight if (isSuccessfulUpdate) { @@ -2480,7 +2479,7 @@ class KafkaController(val config: KafkaConfig, partitionsToAlter.keySet ) - partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName, partitionResponses) => + partitionResponses.groupBy(_._1.topic).foreachEntry { (topicName, partitionResponses) => // Add each topic part to the response val topicResponse = if (useTopicsIds) { new AlterPartitionResponseData.TopicData() @@ -2491,7 +2490,7 @@ class KafkaController(val config: KafkaConfig, } alterPartitionResponse.topics.add(topicResponse) - partitionResponses.forKeyValue { (tp, errorOrIsr) => + partitionResponses.foreachEntry { (tp, errorOrIsr) => // Add each partition part to the response (new ISR or error) errorOrIsr match { case Left(error) => diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 2af5381d9b1..c0b92b9c638 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -19,7 +19,6 @@ package kafka.controller import kafka.common.StateChangeFailedException import kafka.controller.Election._ import kafka.server.KafkaConfig -import kafka.utils.Implicits._ import kafka.utils.Logging import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult @@ -437,7 +436,7 @@ class ZkPartitionStateMachine(config: KafkaConfig, val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr( adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion) - finishedUpdates.forKeyValue { (partition, result) => + finishedUpdates.foreachEntry { (partition, result) => result.foreach { leaderAndIsr => val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 5a299a469c0..406fff2b51b 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -18,7 +18,6 @@ package kafka.controller import kafka.common.StateChangeFailedException import kafka.server.KafkaConfig -import kafka.utils.Implicits._ import kafka.utils.Logging import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult @@ -110,7 +109,7 @@ class ZkReplicaStateMachine(config: KafkaConfig, if (replicas.nonEmpty) { try { controllerBrokerRequestBatch.newBatch() - replicas.groupBy(_.replica).forKeyValue { (replicaId, replicas) => + replicas.groupBy(_.replica).foreachEntry { (replicaId, replicas) => doHandleStateChanges(replicaId, replicas, targetState) } controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) @@ -227,7 +226,7 @@ class ZkReplicaStateMachine(config: KafkaConfig, controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined } val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition)) - updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) => + updatedLeaderIsrAndControllerEpochs.foreachEntry { (partition, leaderIsrAndControllerEpoch) => stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica") if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) { val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 01756550569..fcf5766b577 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -18,7 +18,6 @@ package kafka.coordinator.group import kafka.common.OffsetAndMetadata import kafka.server.{KafkaConfig, ReplicaManager} -import kafka.utils.Implicits.MapExtensionMethods import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} import org.apache.kafka.common.metrics.Metrics @@ -281,7 +280,7 @@ private[group] class GroupCoordinatorAdapter( coordinator.handleDeleteGroups( groupIds.asScala.toSet, new RequestLocal(bufferSupplier) - ).forKeyValue { (groupId, error) => + ).foreachEntry { (groupId, error) => results.add(new DeleteGroupsResponseData.DeletableGroupResult() .setGroupId(groupId) .setErrorCode(error.code)) @@ -338,7 +337,7 @@ private[group] class GroupCoordinatorAdapter( val topicsList = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]() val topicsMap = new mutable.HashMap[String, OffsetFetchResponseData.OffsetFetchResponseTopics]() - results.forKeyValue { (tp, offset) => + results.foreachEntry { (tp, offset) => val topic = topicsMap.get(tp.topic) match { case Some(topic) => topic @@ -378,7 +377,7 @@ private[group] class GroupCoordinatorAdapter( val response = new OffsetCommitResponseData() val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() - commitStatus.forKeyValue { (tp, error) => + commitStatus.foreachEntry { (tp, error) => val topic = byTopics.get(tp.topic) match { case Some(existingTopic) => existingTopic @@ -445,7 +444,7 @@ private[group] class GroupCoordinatorAdapter( val response = new TxnOffsetCommitResponseData() val byTopics = new mutable.HashMap[String, TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]() - results.forKeyValue { (tp, error) => + results.foreachEntry { (tp, error) => val topic = byTopics.get(tp.topic) match { case Some(existingTopic) => existingTopic @@ -546,7 +545,7 @@ private[group] class GroupCoordinatorAdapter( future.completeExceptionally(groupError.exception) } else { val response = new OffsetDeleteResponseData() - topicPartitionResults.forKeyValue { (topicPartition, error) => + topicPartitionResults.foreachEntry { (topicPartition, error) => var topic = response.topics.find(topicPartition.topic) if (topic == null) { topic = new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topicPartition.topic) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 5bf69e9aa32..7d74512b712 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -22,7 +22,6 @@ import java.util.concurrent.locks.ReentrantLock import kafka.common.OffsetAndMetadata import kafka.utils.{CoreUtils, Logging, nonthreadsafe} -import kafka.utils.Implicits._ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.{TopicIdPartition, TopicPartition} import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember @@ -651,7 +650,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def prepareOffsetCommit(offsets: Map[TopicIdPartition, OffsetAndMetadata]): Unit = { receivedConsumerOffsetCommits = true - offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) => + offsets.foreachEntry { (topicIdPartition, offsetAndMetadata) => pendingOffsetCommits += topicIdPartition.topicPartition -> offsetAndMetadata } } @@ -662,7 +661,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) - offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) => + offsets.foreachEntry { (topicIdPartition, offsetAndMetadata) => producerOffsets.put(topicIdPartition.topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata)) } } @@ -708,7 +707,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState val pendingOffsetsOpt = pendingTransactionalOffsetCommits.remove(producerId) if (isCommit) { pendingOffsetsOpt.foreach { pendingOffsets => - pendingOffsets.forKeyValue { (topicPartition, commitRecordMetadataAndOffset) => + pendingOffsets.foreachEntry { (topicPartition, commitRecordMetadataAndOffset) => if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty) throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " + s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.") @@ -746,7 +745,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = { topicPartitions.flatMap { topicPartition => pendingOffsetCommits.remove(topicPartition) - pendingTransactionalOffsetCommits.forKeyValue { (_, pendingOffsets) => + pendingTransactionalOffsetCommits.foreachEntry { (_, pendingOffsets) => pendingOffsets.remove(topicPartition) } val removedOffset = offsets.remove(topicPartition) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 21bbae6487b..8db98654b94 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -30,7 +30,6 @@ import kafka.common.OffsetAndMetadata import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError import kafka.server.ReplicaManager import kafka.utils.CoreUtils.inLock -import kafka.utils.Implicits._ import kafka.utils._ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.compress.Compression @@ -393,7 +392,7 @@ class GroupMetadataManager(brokerId: Int, val responseError = group.inLock { if (status.error == Errors.NONE) { if (!group.is(Dead)) { - filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => + filteredOffsetMetadata.foreachEntry { (topicIdPartition, offsetAndMetadata) => if (isTxnOffsetCommit) group.onTxnOffsetCommitAppend(producerId, topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) else @@ -409,7 +408,7 @@ class GroupMetadataManager(brokerId: Int, if (!group.is(Dead)) { if (!group.hasPendingOffsetCommitsFromProducer(producerId)) removeProducerGroup(producerId, group.groupId) - filteredOffsetMetadata.forKeyValue { (topicIdPartition, offsetAndMetadata) => + filteredOffsetMetadata.foreachEntry { (topicIdPartition, offsetAndMetadata) => if (isTxnOffsetCommit) group.failPendingTxnOffsetCommit(producerId, topicIdPartition) else @@ -705,11 +704,11 @@ class GroupMetadataManager(brokerId: Int, }.partition { case (group, _) => loadedGroups.contains(group) } val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]() - pendingOffsets.forKeyValue { (producerId, producerOffsets) => + pendingOffsets.foreachEntry { (producerId, producerOffsets) => producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _)) producerOffsets .groupBy(_._1.group) - .forKeyValue { (group, offsets) => + .foreachEntry { (group, offsets) => val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) groupProducerOffsets ++= offsets.map { case (groupTopicPartition, offset) => @@ -878,7 +877,7 @@ class GroupMetadataManager(brokerId: Int, replicaManager.onlinePartition(appendPartition).foreach { partition => val tombstones = ArrayBuffer.empty[SimpleRecord] - removedOffsets.forKeyValue { (topicPartition, offsetAndMetadata) => + removedOffsets.foreachEntry { (topicPartition, offsetAndMetadata) => trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata") val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition) tombstones += new SimpleRecord(timestamp, commitKey, null) @@ -971,7 +970,7 @@ class GroupMetadataManager(brokerId: Int, } private def removeGroupFromAllProducers(groupId: String): Unit = openGroupsForProducer synchronized { - openGroupsForProducer.forKeyValue { (_, groups) => + openGroupsForProducer.foreachEntry { (_, groups) => groups.remove(groupId) } } diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 410d7f044ee..afd645877d1 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -22,7 +22,6 @@ import kafka.coordinator.transaction.TransactionMarkerChannelManager.{LogAppendR import java.util import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue} import kafka.server.{KafkaConfig, MetadataCache} -import kafka.utils.Implicits._ import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.clients._ import org.apache.kafka.common.metrics.Metrics @@ -147,7 +146,7 @@ class TxnMarkerQueue(@volatile var destination: Node) extends Logging { } def forEachTxnTopicPartition[B](f:(Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]) => B): Unit = - markersPerTxnTopicPartition.forKeyValue { (partition, queue) => + markersPerTxnTopicPartition.foreachEntry { (partition, queue) => if (!queue.isEmpty) f(partition, queue) } diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 094c4d950cc..5abd9768767 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.server.{MetadataCache, ReplicaManager} import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.{Logging, Pool} -import kafka.utils.Implicits._ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.ListTransactionsResponseData @@ -237,7 +236,7 @@ class TransactionStateManager(brokerId: Int, private[transaction] def removeExpiredTransactionalIds(): Unit = { inReadLock(stateLock) { - transactionMetadataCache.forKeyValue { (partitionId, partitionCacheEntry) => + transactionMetadataCache.foreachEntry { (partitionId, partitionCacheEntry) => val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) removeExpiredTransactionalIds(transactionPartition, partitionCacheEntry) } @@ -250,7 +249,7 @@ class TransactionStateManager(brokerId: Int, tombstoneRecords: MemoryRecords ): Unit = { def removeFromCacheCallback(responses: collection.Map[TopicPartition, PartitionResponse]): Unit = { - responses.forKeyValue { (topicPartition, response) => + responses.foreachEntry { (topicPartition, response) => inReadLock(stateLock) { transactionMetadataCache.get(topicPartition.partition).foreach { txnMetadataCacheEntry => expiredForPartition.foreach { idCoordinatorEpochAndMetadata => @@ -345,7 +344,7 @@ class TransactionStateManager(brokerId: Int, } val states = new java.util.ArrayList[ListTransactionsResponseData.TransactionState] - transactionMetadataCache.forKeyValue { (_, cache) => + transactionMetadataCache.foreachEntry { (_, cache) => cache.metadataPerTransactionalId.values.foreach { txnMetadata => txnMetadata.inLock { if (shouldInclude(txnMetadata)) { diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 176e510c6d0..a0e68364138 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -35,7 +35,6 @@ import scala.jdk.CollectionConverters._ import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} -import kafka.utils.Implicits._ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.requests.{AbstractControlRequest, LeaderAndIsrRequest} import org.apache.kafka.image.TopicsImage @@ -699,7 +698,7 @@ class LogManager(logDirs: Seq[File], } try { - jobs.forKeyValue { (dir, dirJobs) => + jobs.foreachEntry { (dir, dirJobs) => if (waitForAllToComplete(dirJobs, e => warn(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}"))) { val logs = logsInDir(localLogsByDir, dir) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index ca4ab90957b..fc215adb3aa 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -24,7 +24,6 @@ import com.typesafe.scalalogging.Logger import kafka.network import kafka.server.KafkaConfig import kafka.utils.Logging -import kafka.utils.Implicits._ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message.EnvelopeResponseData @@ -465,7 +464,7 @@ class RequestChannel(val queueSize: Int, requestQueue.take() def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]): Unit = { - errors.forKeyValue { (error, count) => + errors.foreachEntry { (error, count) => metrics(apiKey.name).markErrorMeter(error, count) } } diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index dd5df2f2da4..501c6e2ece3 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -21,7 +21,6 @@ import java.util.concurrent.{CompletableFuture, CompletionStage} import com.typesafe.scalalogging.Logger import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils._ -import kafka.utils.Implicits._ import kafka.zk._ import org.apache.kafka.common.Endpoint import org.apache.kafka.common.acl._ @@ -107,7 +106,7 @@ object AclAuthorizer { // be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, forceZkSslClientEnable = true) // add in any prefixed overlays - ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.forKeyValue { (kafkaProp, sysProp) => + ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.foreachEntry { (kafkaProp, sysProp) => configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue => zkClientConfig.setProperty(sysProp, if (kafkaProp == ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG) @@ -187,7 +186,7 @@ class AclAuthorizer extends Authorizer with Logging { override def configure(javaConfigs: util.Map[String, _]): Unit = { val configs = javaConfigs.asScala val props = new java.util.Properties() - configs.forKeyValue { (key, value) => props.put(key, value.toString.trim) } + configs.foreachEntry { (key, value) => props.put(key, value.toString.trim) } superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect { case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet @@ -251,7 +250,7 @@ class AclAuthorizer extends Authorizer with Logging { if (aclsToCreate.nonEmpty) { lock synchronized { - aclsToCreate.forKeyValue { (resource, aclsWithIndex) => + aclsToCreate.foreachEntry { (resource, aclsWithIndex) => try { updateResourceAcls(resource) { currentAcls => val newAcls = aclsWithIndex.map { case (acl, _) => new AclEntry(acl.entry) } @@ -299,7 +298,7 @@ class AclAuthorizer extends Authorizer with Logging { resource -> matchingFilters }.toMap.filter(_._2.nonEmpty) - resourcesToUpdate.forKeyValue { (resource, matchingFilters) => + resourcesToUpdate.foreachEntry { (resource, matchingFilters) => val resourceBindingsBeingDeleted = new mutable.HashMap[AclBinding, Int]() try { updateResourceAcls(resource) { currentAcls => @@ -334,7 +333,7 @@ class AclAuthorizer extends Authorizer with Logging { override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = { val aclBindings = new util.ArrayList[AclBinding]() - aclCache.forKeyValue { case (resource, versionedAcls) => + aclCache.foreachEntry { case (resource, versionedAcls) => versionedAcls.acls.foreach { acl => val binding = new AclBinding(resource, acl.ace) if (filter.matches(binding)) @@ -552,7 +551,7 @@ class AclAuthorizer extends Authorizer with Logging { aclCacheSnapshot .from(new ResourcePattern(resourceType, resourceName, PatternType.PREFIXED)) .to(new ResourcePattern(resourceType, resourceName.take(1), PatternType.PREFIXED)) - .forKeyValue { (resource, acls) => + .foreachEntry { (resource, acls) => if (resourceName.startsWith(resource.name)) prefixed ++= acls.acls } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index eced213734b..c584c987b01 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -18,7 +18,6 @@ package kafka.server import kafka.cluster.BrokerEndPoint -import kafka.utils.Implicits._ import kafka.utils.Logging import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.utils.Utils @@ -66,11 +65,11 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri def resizeThreadPool(newSize: Int): Unit = { def migratePartitions(newSize: Int): Unit = { val allRemovedPartitionsMap = mutable.Map[TopicPartition, InitialFetchState]() - fetcherThreadMap.forKeyValue { (id, thread) => + fetcherThreadMap.foreachEntry { (id, thread) => val partitionStates = thread.removeAllPartitions() if (id.fetcherId >= newSize) thread.shutdown() - partitionStates.forKeyValue { (topicPartition, currentFetchState) => + partitionStates.foreachEntry { (topicPartition, currentFetchState) => val initialFetchState = InitialFetchState(currentFetchState.topicId, thread.leader.brokerEndPoint(), currentLeaderEpoch = currentFetchState.currentLeaderEpoch, initOffset = currentFetchState.fetchOffset) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 119f9d01bae..73c89a94399 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -20,7 +20,6 @@ package kafka.server import com.yammer.metrics.core.Meter import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} import kafka.utils.CoreUtils.inLock -import kafka.utils.Implicits._ import kafka.utils.{DelayedItem, Logging, Pool} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.PartitionStates @@ -256,7 +255,7 @@ abstract class AbstractFetcherThread(name: String, val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.HashSet.empty[TopicPartition] - fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) => + fetchedEpochs.foreachEntry { (tp, leaderEpochOffset) => if (partitionStates.contains(tp)) { Errors.forCode(leaderEpochOffset.errorCode) match { case Errors.NONE => @@ -329,7 +328,7 @@ abstract class AbstractFetcherThread(name: String, if (responseData.nonEmpty) { // process fetched data inLock(partitionMapLock) { - responseData.forKeyValue { (topicPartition, partitionData) => + responseData.foreachEntry { (topicPartition, partitionData) => Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState => // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request. // In this case, we only want to process the fetch response if the partition state is ready for fetch and @@ -508,7 +507,7 @@ abstract class AbstractFetcherThread(name: String, try { failedPartitions.removeAll(initialFetchStates.keySet) - initialFetchStates.forKeyValue { (tp, initialFetchState) => + initialFetchStates.foreachEntry { (tp, initialFetchState) => val currentState = partitionStates.stateValue(tp) val updatedState = partitionFetchState(tp, initialFetchState, currentState) partitionStates.updateAndMoveToEnd(tp, updatedState) diff --git a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala index 90e8114583b..0b680e034b2 100644 --- a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala +++ b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala @@ -18,7 +18,6 @@ package kafka.server import kafka.server.AddPartitionsToTxnManager.{VerificationFailureRateMetricName, VerificationTimeMsMetricName} -import kafka.utils.Implicits.MapExtensionMethods import kafka.utils.Logging import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} import org.apache.kafka.common.internals.Topic @@ -99,7 +98,7 @@ class AddPartitionsToTxnManager( callback(topicPartitions.map(tp => tp -> Errors.COORDINATOR_NOT_AVAILABLE).toMap) } else { val topicCollection = new AddPartitionsToTxnTopicCollection() - topicPartitions.groupBy(_.topic).forKeyValue { (topic, tps) => + topicPartitions.groupBy(_.topic).foreachEntry { (topic, tps) => topicCollection.add(new AddPartitionsToTxnTopic() .setName(topic) .setPartitions(tps.map(tp => Int.box(tp.partition)).toList.asJava)) diff --git a/core/src/main/scala/kafka/server/AuthHelper.scala b/core/src/main/scala/kafka/server/AuthHelper.scala index 6e779ce9007..4d21fb43859 100644 --- a/core/src/main/scala/kafka/server/AuthHelper.scala +++ b/core/src/main/scala/kafka/server/AuthHelper.scala @@ -20,7 +20,6 @@ package kafka.server import java.lang.{Byte => JByte} import java.util.Collections import kafka.network.RequestChannel -import kafka.utils.CoreUtils import org.apache.kafka.clients.admin.EndpointType import org.apache.kafka.common.acl.AclOperation import org.apache.kafka.common.acl.AclOperation.DESCRIBE @@ -105,7 +104,7 @@ class AuthHelper(authorizer: Option[Authorizer]) { logIfDenied: Boolean = true)(resourceName: T => String): Set[String] = { authorizer match { case Some(authZ) => - val resourceNameToCount = CoreUtils.groupMapReduce(resources)(resourceName)(_ => 1)(_ + _) + val resourceNameToCount = resources.groupMapReduce(resourceName)(_ => 1)(_ + _) val actions = resourceNameToCount.iterator.map { case (resourceName, count) => val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL) new Action(operation, resource, count, logIfAllowed, logIfDenied) diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 77a3874a3dd..8e8c603f1d2 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -24,7 +24,6 @@ import kafka.log.UnifiedLog import kafka.network.ConnectionQuotas import kafka.server.Constants._ import kafka.server.QuotaFactory.QuotaManagers -import kafka.utils.Implicits._ import kafka.utils.Logging import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs, ZooKeeperInternals} import org.apache.kafka.common.config.TopicConfig @@ -64,7 +63,7 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, // Validate the configurations. val configNamesToExclude = excludedConfigs(topic, topicConfig) val props = new Properties() - topicConfig.asScala.forKeyValue { (key, value) => + topicConfig.asScala.foreachEntry { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala index 6d25bd1f5c8..0375cc8a072 100644 --- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala +++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala @@ -20,7 +20,6 @@ package kafka.server import java.util.concurrent.TimeUnit -import kafka.utils.Implicits._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.message.DeleteRecordsResponseData import org.apache.kafka.common.protocol.Errors @@ -49,7 +48,7 @@ class DelayedDeleteRecords(delayMs: Long, extends DelayedOperation(delayMs) { // first update the acks pending variable according to the error code - deleteRecordsStatus.forKeyValue { (topicPartition, status) => + deleteRecordsStatus.foreachEntry { (topicPartition, status) => if (status.responseStatus.errorCode == Errors.NONE.code) { // Timeout error state will be cleared when required acks are received status.acksPending = true @@ -70,7 +69,7 @@ class DelayedDeleteRecords(delayMs: Long, */ override def tryComplete(): Boolean = { // check for each partition if it still has pending acks - deleteRecordsStatus.forKeyValue { (topicPartition, status) => + deleteRecordsStatus.foreachEntry { (topicPartition, status) => trace(s"Checking delete records satisfaction for $topicPartition, current status $status") // skip those partitions that have already been satisfied if (status.acksPending) { @@ -106,7 +105,7 @@ class DelayedDeleteRecords(delayMs: Long, } override def onExpiration(): Unit = { - deleteRecordsStatus.forKeyValue { (topicPartition, status) => + deleteRecordsStatus.foreachEntry { (topicPartition, status) => if (status.acksPending) { DelayedDeleteRecordsMetrics.recordExpiration(topicPartition) } diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 7a21a86260c..c6470c81358 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Lock import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.Meter -import kafka.utils.Implicits._ import kafka.utils.Pool import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors @@ -65,7 +64,7 @@ class DelayedProduce(delayMs: Long, override lazy val logger: Logger = DelayedProduce.logger // first update the acks pending variable according to the error code - produceMetadata.produceStatus.forKeyValue { (topicPartition, status) => + produceMetadata.produceStatus.foreachEntry { (topicPartition, status) => if (status.responseStatus.error == Errors.NONE) { // Timeout error state will be cleared when required acks are received status.acksPending = true @@ -90,7 +89,7 @@ class DelayedProduce(delayMs: Long, */ override def tryComplete(): Boolean = { // check for each partition if it still has pending acks - produceMetadata.produceStatus.forKeyValue { (topicPartition, status) => + produceMetadata.produceStatus.foreachEntry { (topicPartition, status) => trace(s"Checking produce satisfaction for $topicPartition, current status $status") // skip those partitions that have already been satisfied if (status.acksPending) { @@ -119,7 +118,7 @@ class DelayedProduce(delayMs: Long, } override def onExpiration(): Unit = { - produceMetadata.produceStatus.forKeyValue { (topicPartition, status) => + produceMetadata.produceStatus.foreachEntry { (topicPartition, status) => if (status.acksPending) { debug(s"Expiring produce request for partition $topicPartition with status $status") DelayedProduceMetrics.recordExpiration(topicPartition) diff --git a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala index 0da3cc20764..8fca0226a0b 100644 --- a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala +++ b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala @@ -18,7 +18,6 @@ package kafka.server import com.yammer.metrics.core.Meter import kafka.log.AsyncOffsetReadFutureHolder -import kafka.utils.Implicits._ import kafka.utils.Pool import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.ApiException @@ -58,7 +57,7 @@ class DelayedRemoteListOffsets(delayMs: Long, // Mark the status as completed, if there is no async task to track. // If there is a task to track, then build the response as REQUEST_TIMED_OUT by default. - metadata.statusByPartition.forKeyValue { (topicPartition, status) => + metadata.statusByPartition.foreachEntry { (topicPartition, status) => status.completed = status.futureHolderOpt.isEmpty if (status.futureHolderOpt.isDefined) { status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition())) @@ -70,7 +69,7 @@ class DelayedRemoteListOffsets(delayMs: Long, * Call-back to execute when a delayed operation gets expired and hence forced to complete. */ override def onExpiration(): Unit = { - metadata.statusByPartition.forKeyValue { (topicPartition, status) => + metadata.statusByPartition.foreachEntry { (topicPartition, status) => if (!status.completed) { debug(s"Expiring list offset request for partition $topicPartition with status $status") status.futureHolderOpt.foreach(futureHolder => futureHolder.jobFuture.cancel(true)) @@ -100,7 +99,7 @@ class DelayedRemoteListOffsets(delayMs: Long, */ override def tryComplete(): Boolean = { var completable = true - metadata.statusByPartition.forKeyValue { (partition, status) => + metadata.statusByPartition.foreachEntry { (partition, status) => if (!status.completed) { status.futureHolderOpt.foreach { futureHolder => if (futureHolder.taskFuture.isDone) { diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index ffc338f7d18..f93f4c82349 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -26,7 +26,6 @@ import kafka.log.{LogCleaner, LogManager} import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.server.DynamicBrokerConfig._ import kafka.utils.{CoreUtils, Logging} -import kafka.utils.Implicits._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.config.internals.BrokerSecurityConfigs @@ -399,7 +398,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging props.setProperty(configName, passwordEncoder.encode(new Password(value))) } } - configProps.asScala.forKeyValue { (name, value) => + configProps.asScala.foreachEntry { (name, value) => if (isPasswordConfig(name)) encodePassword(name, value) } @@ -436,7 +435,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } } - props.asScala.forKeyValue { (name, value) => + props.asScala.foreachEntry { (name, value) => if (isPasswordConfig(name)) decodePassword(name, value) } @@ -451,7 +450,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging val props = persistentProps.clone().asInstanceOf[Properties] if (props.asScala.keySet.exists(isPasswordConfig)) { maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder => - persistentProps.asScala.forKeyValue { (configName, value) => + persistentProps.asScala.foreachEntry { (configName, value) => if (isPasswordConfig(configName) && value != null) { val decoded = try { Some(passwordDecoder.decode(value).value) @@ -545,7 +544,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging * `props` (even though `log.roll.hours` is secondary to `log.roll.ms`). */ private def overrideProps(props: mutable.Map[String, String], propsOverride: mutable.Map[String, String]): Unit = { - propsOverride.forKeyValue { (k, v) => + propsOverride.foreachEntry { (k, v) => // Remove synonyms of `k` to ensure the right precedence is applied. But disable `matchListenerOverride` // so that base configs corresponding to listener configs are not removed. Base configs should not be removed // since they may be used by other listeners. It is ok to retain them in `props` since base configs cannot be @@ -916,7 +915,7 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me updatedConfigs: util.Map[String, _]): Unit = { val props = new util.HashMap[String, AnyRef] updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef])) - propsOverride.forKeyValue((k, v) => props.put(k, v)) + propsOverride.foreachEntry((k, v) => props.put(k, v)) val reporters = dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses, classOf[MetricsReporter], props) // Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange, @@ -1058,7 +1057,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi newAdvertisedListeners: Map[ListenerName, EndPoint] ): Boolean = { if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true - oldAdvertisedListeners.forKeyValue { + oldAdvertisedListeners.foreachEntry { case (oldListenerName, oldEndpoint) => newAdvertisedListeners.get(oldListenerName) match { case None => return true diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2f508ef6dc7..c2f24381258 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -24,7 +24,6 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.handlers.DescribeTopicPartitionsRequestHandler import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache} import kafka.server.share.SharePartitionManager -import kafka.utils.Implicits._ import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.admin.AdminUtils import org.apache.kafka.clients.CommonClientConfigs @@ -344,7 +343,7 @@ class KafkaApis(val requestChannel: RequestChannel, partitionStates) // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas. - result.forKeyValue { (topicPartition, error) => + result.foreachEntry { (topicPartition, error) => if (error == Errors.NONE) { val partitionState = partitionStates(topicPartition) if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME @@ -662,7 +661,7 @@ class KafkaApis(val requestChannel: RequestChannel, var errorInResponse = false val nodeEndpoints = new mutable.HashMap[Int, Node] - mergedResponseStatus.forKeyValue { (topicPartition, status) => + mergedResponseStatus.foreachEntry { (topicPartition, status) => if (status.error != Errors.NONE) { errorInResponse = true debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( @@ -732,7 +731,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def processingStatsCallback(processingStats: FetchResponseStats): Unit = { - processingStats.forKeyValue { (tp, info) => + processingStats.foreachEntry { (tp, info) => updateRecordConversionStats(request, tp, info) } } @@ -2208,7 +2207,7 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a DeleteRecordsResponse def sendResponseCallback(authorizedTopicResponses: Map[TopicPartition, DeleteRecordsPartitionResult]): Unit = { val mergedResponseStatus = authorizedTopicResponses ++ unauthorizedTopicResponses ++ nonExistingTopicResponses - mergedResponseStatus.forKeyValue { (topicPartition, status) => + mergedResponseStatus.foreachEntry { (topicPartition, status) => if (status.errorCode != Errors.NONE.code) { debug("DeleteRecordsRequest with correlation id %d from client %s on partition %s failed due to %s".format( request.header.correlationId, @@ -2485,7 +2484,7 @@ class KafkaApis(val requestChannel: RequestChannel, entriesPerPartition = controlRecords, requestLocal = requestLocal, responseCallback = errors => { - errors.forKeyValue { (tp, partitionResponse) => + errors.foreachEntry { (tp, partitionResponse) => markerResults.put(tp, partitionResponse.error) } maybeComplete() @@ -3328,11 +3327,11 @@ class KafkaApis(val requestChannel: RequestChannel, val electionResults = new util.ArrayList[ReplicaElectionResult]() adjustedResults .groupBy { case (tp, _) => tp.topic } - .forKeyValue { (topic, ps) => + .foreachEntry { (topic, ps) => val electionResult = new ReplicaElectionResult() electionResult.setTopic(topic) - ps.forKeyValue { (topicPartition, error) => + ps.foreachEntry { (topicPartition, error) => val partitionResult = new PartitionResult() partitionResult.setPartitionId(topicPartition.partition) partitionResult.setErrorCode(error.error.code) diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala index edf514cb77e..3cdff49d408 100644 --- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala @@ -21,7 +21,6 @@ import kafka.cluster.BrokerEndPoint import java.util.{Collections, Optional} import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} -import kafka.utils.Implicits.MapExtensionMethods import kafka.utils.Logging import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.errors.KafkaStorageException @@ -141,7 +140,7 @@ class RemoteLeaderEndPoint(logPrefix: String, } val topics = new OffsetForLeaderTopicCollection(partitions.size) - partitions.forKeyValue { (topicPartition, epochData) => + partitions.foreachEntry { (topicPartition, epochData) => var topic = topics.find(topicPartition.topic) if (topic == null) { topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic) @@ -182,7 +181,7 @@ class RemoteLeaderEndPoint(logPrefix: String, val partitionsWithError = mutable.Set[TopicPartition]() val builder = fetchSessionHandler.newBuilder(partitions.size, false) - partitions.forKeyValue { (topicPartition, fetchState) => + partitions.foreachEntry { (topicPartition, fetchState) => // We will not include a replica in the fetch request if it should be throttled. if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) { try { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 58873cad312..65da75c8865 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -25,7 +25,6 @@ import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} import kafka.server.metadata.ZkMetadataCache -import kafka.utils.Implicits._ import kafka.utils._ import kafka.zk.KafkaZkClient import org.apache.kafka.common.errors._ @@ -436,7 +435,7 @@ class ReplicaManager(val config: KafkaConfig, // First, stop the partitions. This will shutdown the fetchers and other managers val partitionsToStop = strayPartitions.map(tp => StopPartition(tp, deleteLocalLog = false)).toSet - stopPartitions(partitionsToStop).forKeyValue { (topicPartition, exception) => + stopPartitions(partitionsToStop).foreachEntry { (topicPartition, exception) => error(s"Unable to stop stray partition $topicPartition", exception) } @@ -512,7 +511,7 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.info(s"Handling StopReplica request correlationId $correlationId from controller " + s"$controllerId for ${partitionStates.size} partitions") if (stateChangeLogger.isTraceEnabled) - partitionStates.forKeyValue { (topicPartition, partitionState) => + partitionStates.foreachEntry { (topicPartition, partitionState) => stateChangeLogger.trace(s"Received StopReplica request $partitionState " + s"correlation id $correlationId from controller $controllerId " + s"epoch $controllerEpoch for partition $topicPartition") @@ -529,7 +528,7 @@ class ReplicaManager(val config: KafkaConfig, this.controllerEpoch = controllerEpoch val stoppedPartitions = mutable.Buffer.empty[StopPartition] - partitionStates.forKeyValue { (topicPartition, partitionState) => + partitionStates.foreachEntry { (topicPartition, partitionState) => val deletePartition = partitionState.deletePartition() getPartition(topicPartition) match { @@ -836,7 +835,7 @@ class ReplicaManager(val config: KafkaConfig, val transactionalProducerInfo = mutable.HashSet[(Long, Short)]() val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]() - entriesPerPartition.forKeyValue { (topicPartition, records) => + entriesPerPartition.foreachEntry { (topicPartition, records) => // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe. val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) transactionalBatches.foreach(batch => transactionalProducerInfo.add(batch.producerId, batch.producerEpoch)) @@ -2203,14 +2202,14 @@ class ReplicaManager(val config: KafkaConfig, val data = new LeaderAndIsrResponseData().setErrorCode(Errors.NONE.code) if (leaderAndIsrRequest.version < 5) { - responseMap.forKeyValue { (tp, error) => + responseMap.foreachEntry { (tp, error) => data.partitionErrors.add(new LeaderAndIsrPartitionError() .setTopicName(tp.topic) .setPartitionIndex(tp.partition) .setErrorCode(error.code)) } } else { - responseMap.forKeyValue { (tp, error) => + responseMap.foreachEntry { (tp, error) => val topicId = topicIds.get(tp.topic) var topic = data.topics.find(topicId) if (topic == null) { @@ -2335,7 +2334,7 @@ class ReplicaManager(val config: KafkaConfig, s"controller $controllerId epoch $controllerEpoch as part of the become-leader transition for " + s"${partitionStates.size} partitions") // Update the partition information to be the leader - partitionStates.forKeyValue { (partition, partitionState) => + partitionStates.foreachEntry { (partition, partitionState) => try { if (partition.makeLeader(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) { partitionsToMakeLeaders += partition @@ -2400,7 +2399,7 @@ class ReplicaManager(val config: KafkaConfig, highWatermarkCheckpoints: OffsetCheckpoints, topicIds: String => Option[Uuid]) : Set[Partition] = { val traceLoggingEnabled = stateChangeLogger.isTraceEnabled - partitionStates.forKeyValue { (partition, partitionState) => + partitionStates.foreachEntry { (partition, partitionState) => if (traceLoggingEnabled) stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + @@ -2410,7 +2409,7 @@ class ReplicaManager(val config: KafkaConfig, val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() try { - partitionStates.forKeyValue { (partition, partitionState) => + partitionStates.foreachEntry { (partition, partitionState) => val newLeaderBrokerId = partitionState.leader try { if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { @@ -2868,7 +2867,7 @@ class ReplicaManager(val config: KafkaConfig, } .toSet stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).") - stopPartitions(deletes).forKeyValue { (topicPartition, e) => + stopPartitions(deletes).foreachEntry { (topicPartition, e) => if (e.isInstanceOf[KafkaStorageException]) { stateChangeLogger.error(s"Unable to delete replica $topicPartition because " + "the local replica for the partition is in an offline log directory") @@ -2918,7 +2917,7 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.info(s"Transitioning ${localLeaders.size} partition(s) to " + "local leaders.") replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet) - localLeaders.forKeyValue { (tp, info) => + localLeaders.foreachEntry { (tp, info) => getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => try { val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) @@ -2953,7 +2952,7 @@ class ReplicaManager(val config: KafkaConfig, val partitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition] val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean] val followerTopicSet = new mutable.HashSet[String] - localFollowers.forKeyValue { (tp, info) => + localFollowers.foreachEntry { (tp, info) => getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => try { followerTopicSet.add(tp.topic) @@ -3006,7 +3005,7 @@ class ReplicaManager(val config: KafkaConfig, val listenerName = config.interBrokerListenerName.value val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState] - partitionsToStartFetching.forKeyValue { (topicPartition, partition) => + partitionsToStartFetching.foreachEntry { (topicPartition, partition) => val nodeOpt = partition.leaderReplicaIdOpt .flatMap(leaderId => Option(newImage.cluster.broker(leaderId))) .flatMap(_.node(listenerName).asScala) diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala index 748a0776462..238c7304ad4 100644 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala @@ -22,7 +22,6 @@ import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, toLoggableProps} import kafka.server.metadata.ZkConfigRepository import kafka.utils._ -import kafka.utils.Implicits._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.admin.AdminUtils import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism} @@ -963,7 +962,7 @@ class ZkAdminManager(val config: KafkaConfig, } ).toMap - illegalRequestsByUser.forKeyValue { (user, errorMessage) => + illegalRequestsByUser.foreachEntry { (user, errorMessage) => retval.results.add(new AlterUserScramCredentialsResult().setUser(user) .setErrorCode(if (errorMessage == unknownScramMechanismMsg) {Errors.UNSUPPORTED_SASL_MECHANISM.code} else {Errors.UNACCEPTABLE_CREDENTIAL.code}) .setErrorMessage(errorMessage)) } @@ -1028,7 +1027,7 @@ class ZkAdminManager(val config: KafkaConfig, }).collect { case (user: String, exception: Exception) => (user, exception) }.toMap // report failures - usersFailedToPrepareProperties.++(usersFailedToPersist).forKeyValue { (user, exception) => + usersFailedToPrepareProperties.++(usersFailedToPersist).foreachEntry { (user, exception) => val error = Errors.forException(exception) retval.results.add(new AlterUserScramCredentialsResult() .setUser(user) diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index 7297b87c593..ca41ebda7f3 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -27,7 +27,6 @@ import kafka.controller.StateChangeLogger import kafka.server.{BrokerFeatures, CachedControllerId, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId} import kafka.utils.CoreUtils._ import kafka.utils.Logging -import kafka.utils.Implicits._ import org.apache.kafka.admin.BrokerMetadata import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataPartitionState, UpdateMetadataTopicState} @@ -74,7 +73,7 @@ object ZkMetadataCache { val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]() requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), state)) val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]() - currentMetadata.topicNames.forKeyValue((id, name) => { + currentMetadata.topicNames.foreachEntry((id, name) => { try { Option(topicIdToNewState.get(id)) match { case None => @@ -560,7 +559,7 @@ class ZkMetadataCache( } else { //since kafka may do partial metadata updates, we start by copying the previous state val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size) - metadataSnapshot.partitionStates.forKeyValue { (topic, oldPartitionStates) => + metadataSnapshot.partitionStates.foreachEntry { (topic, oldPartitionStates) => val copy = new mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size) copy ++= oldPartitionStates partitionStates(topic) = copy diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 57f84e39825..d4c49251232 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -23,7 +23,6 @@ import java.io._ import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode} import kafka.coordinator.transaction.TransactionLog import kafka.log._ -import kafka.utils.Implicits._ import kafka.utils.{CoreUtils, VerifiableProperties} import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.message.ConsumerProtocolAssignment @@ -90,7 +89,7 @@ object DumpLogSegments { } } - misMatchesForIndexFilesMap.forKeyValue { (fileName, listOfMismatches) => + misMatchesForIndexFilesMap.foreachEntry { (fileName, listOfMismatches) => System.err.println(s"Mismatches in :$fileName") listOfMismatches.foreach { case (indexOffset, logOffset) => System.err.println(s" Index offset: $indexOffset, log offset: $logOffset") @@ -99,7 +98,7 @@ object DumpLogSegments { timeIndexDumpErrors.printErrors() - nonConsecutivePairsForLogFilesMap.forKeyValue { (fileName, listOfNonConsecutivePairs) => + nonConsecutivePairsForLogFilesMap.foreachEntry { (fileName, listOfNonConsecutivePairs) => System.err.println(s"Non-consecutive offsets in $fileName") listOfNonConsecutivePairs.foreach { case (first, second) => System.err.println(s" $first is followed by $second") diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 710bbddbd50..29c89c29898 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -36,7 +36,6 @@ import org.apache.kafka.network.SocketServerConfigs import org.slf4j.event.Level import java.util -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ /** @@ -260,13 +259,6 @@ object CoreUtils { } } - @nowarn("cat=unused") // see below for explanation - def groupMapReduce[T, K, B](elements: Iterable[T])(key: T => K)(f: T => B)(reduce: (B, B) => B): Map[K, B] = { - // required for Scala 2.12 compatibility, unused in Scala 2.13 and hence we need to suppress the unused warning - import scala.collection.compat._ - elements.groupMapReduce(key)(f)(reduce) - } - def replicaToBrokerAssignmentAsScala(map: util.Map[Integer, util.List[Integer]]): Map[Int, Seq[Int]] = { map.asScala.map(e => (e._1.asInstanceOf[Int], e._2.asScala.map(_.asInstanceOf[Int]))) } diff --git a/core/src/main/scala/kafka/utils/Implicits.scala b/core/src/main/scala/kafka/utils/Implicits.scala index fbd22ec6b8c..80335992051 100644 --- a/core/src/main/scala/kafka/utils/Implicits.scala +++ b/core/src/main/scala/kafka/utils/Implicits.scala @@ -20,7 +20,6 @@ package kafka.utils import java.util import java.util.Properties -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ /** @@ -46,22 +45,4 @@ object Implicits { (properties: util.Hashtable[AnyRef, AnyRef]).putAll(map.asJava) } - - /** - * Exposes `forKeyValue` which maps to `foreachEntry` in Scala 2.13 and `foreach` in Scala 2.12 - * (with the help of scala.collection.compat). `foreachEntry` avoids the tuple allocation and - * is more efficient. - * - * This was not named `foreachEntry` to avoid `unused import` warnings in Scala 2.13 (the implicit - * would not be triggered in Scala 2.13 since `Map.foreachEntry` would have precedence). - */ - @nowarn("cat=unused-imports") - implicit class MapExtensionMethods[K, V](private val self: scala.collection.Map[K, V]) extends AnyVal { - import scala.collection.compat._ - def forKeyValue[U](f: (K, V) => U): Unit = { - self.foreachEntry { (k, v) => f(k, v) } - } - - } - } diff --git a/core/src/main/scala/kafka/utils/json/DecodeJson.scala b/core/src/main/scala/kafka/utils/json/DecodeJson.scala index 71bfd61cee1..9c7bec0bdd1 100644 --- a/core/src/main/scala/kafka/utils/json/DecodeJson.scala +++ b/core/src/main/scala/kafka/utils/json/DecodeJson.scala @@ -17,10 +17,8 @@ package kafka.utils.json -import scala.collection.{Map, Seq} -import scala.collection.compat._ +import scala.collection.{Factory, Map, Seq} import scala.jdk.CollectionConverters._ - import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode} /** diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 15c95b998b6..290802c5d11 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -22,7 +22,6 @@ import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.controller.ReplicaAssignment import kafka.server.{DynamicConfig, KafkaConfig} import kafka.utils._ -import kafka.utils.Implicits._ import org.apache.kafka.admin.{AdminUtils, BrokerMetadata} import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.errors._ @@ -311,7 +310,7 @@ class AdminZkClient(zkClient: KafkaZkClient, expectedReplicationFactor: Int, availableBrokerIds: Set[Int]): Unit = { - replicaAssignment.forKeyValue { (partitionId, replicas) => + replicaAssignment.foreachEntry { (partitionId, replicas) => if (replicas.isEmpty) throw new InvalidReplicaAssignmentException( s"Cannot have replication factor of 0 for partition id $partitionId.") diff --git a/core/src/test/java/kafka/admin/ConfigCommandTest.java b/core/src/test/java/kafka/admin/ConfigCommandTest.java index 3535374b6e0..5ccfa525a6d 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandTest.java @@ -79,8 +79,8 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import scala.collection.JavaConverters; import scala.collection.Seq; +import scala.jdk.javaapi.CollectionConverters; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -1004,7 +1004,6 @@ public class ConfigCommandTest { }); } - @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters @ParameterizedTest @ValueSource(booleans = {true, false}) public void shouldAlterTopicConfig(boolean file) { @@ -1013,7 +1012,7 @@ public class ConfigCommandTest { addedConfigs.put("delete.retention.ms", "1000000"); addedConfigs.put("min.insync.replicas", "2"); if (file) { - File f = kafka.utils.TestUtils.tempPropertiesFile(JavaConverters.mapAsScalaMap(addedConfigs)); + File f = kafka.utils.TestUtils.tempPropertiesFile(CollectionConverters.asScala(addedConfigs)); filePath = f.getPath(); } @@ -2292,8 +2291,7 @@ public class ConfigCommandTest { } } - @SuppressWarnings({"deprecation"}) private Seq seq(Collection seq) { - return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq(); + return CollectionConverters.asScala(seq).toSeq(); } } diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 26ebed6bd86..ad4439faf54 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -26,8 +26,7 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} -import java.util -import java.util.Properties +import java.util.{Properties, stream} import java.util.concurrent.atomic.AtomicInteger import scala.jdk.CollectionConverters._ import scala.collection.Seq @@ -117,38 +116,35 @@ object BaseConsumerTest { // * KRaft and the classic group protocol // * KRaft and the consumer group protocol def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = { - util.Arrays.stream(Array( - Arguments.of("zk", "classic"), - Arguments.of("kraft", "classic"), - Arguments.of("kraft", "consumer") - )) + stream.Stream.of( + Arguments.of("zk", "classic"), + Arguments.of("kraft", "classic"), + Arguments.of("kraft", "consumer") + ) } - // In Scala 2.12, it is necessary to disambiguate the java.util.stream.Stream.of() method call - // in the case where there's only a single Arguments in the list. The following commented-out - // method works in Scala 2.13, but not 2.12. For this reason, tests which run against just a - // single combination are written using @CsvSource rather than the more elegant @MethodSource. - // def getTestQuorumAndGroupProtocolParametersZkOnly() : java.util.stream.Stream[Arguments] = { - // java.util.stream.Stream.of( - // Arguments.of("zk", "classic")) - // } + def getTestQuorumAndGroupProtocolParametersZkOnly() : java.util.stream.Stream[Arguments] = { + stream.Stream.of( + Arguments.of("zk", "classic") + ) + } // For tests that only work with the classic group protocol, we want to test the following combinations: // * ZooKeeper and the classic group protocol // * KRaft and the classic group protocol def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() : java.util.stream.Stream[Arguments] = { - util.Arrays.stream(Array( - Arguments.of("zk", "classic"), - Arguments.of("kraft", "classic") - )) + stream.Stream.of( + Arguments.of("zk", "classic"), + Arguments.of("kraft", "classic") + ) } // For tests that only work with the consumer group protocol, we want to test the following combination: // * KRaft and the consumer group protocol - def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): java.util.stream.Stream[Arguments] = { - util.Arrays.stream(Array( - Arguments.of("kraft", "consumer") - )) + def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly(): stream.Stream[Arguments] = { + stream.Stream.of( + Arguments.of("kraft", "consumer") + ) } val updateProducerCount = new AtomicInteger() diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala index 219ed9c2a1e..b4bc6f60a8e 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala @@ -25,7 +25,6 @@ import org.apache.kafka.common.PartitionInfo import java.util.stream.Stream import scala.jdk.CollectionConverters._ import scala.collection.mutable -import org.junit.jupiter.params.provider.CsvSource /** * Integration tests for the consumer that covers logic related to manual assignment. @@ -137,9 +136,7 @@ class PlaintextConsumerAssignTest extends AbstractConsumerTest { // partitionsFor not implemented in consumer group protocol and this test requires ZK also @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @CsvSource(Array( - "zk, classic" - )) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersZkOnly")) def testAssignAndConsumeWithLeaderChangeValidatingPositions(quorum: String, groupProtocol: String): Unit = { val numRecords = 10 val producer = createProducer() @@ -243,4 +240,7 @@ class PlaintextConsumerAssignTest extends AbstractConsumerTest { object PlaintextConsumerAssignTest { def getTestQuorumAndGroupProtocolParametersAll: Stream[Arguments] = BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll() + + def getTestQuorumAndGroupProtocolParametersZkOnly: Stream[Arguments] = + BaseConsumerTest.getTestQuorumAndGroupProtocolParametersZkOnly() } diff --git a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala index 7e357eb6e2b..62733e5a6db 100644 --- a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala +++ b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala @@ -23,7 +23,6 @@ import java.nio.ByteBuffer import java.util.{Collections, Optional, Properties} import kafka.network.SocketServer import kafka.security.JaasTestUtils -import kafka.utils.Implicits._ import org.apache.kafka.clients.admin.{Admin, NewTopic} import org.apache.kafka.common.network.{ConnectionMode, ListenerName} import org.apache.kafka.common.protocol.ApiKeys @@ -120,7 +119,7 @@ object IntegrationTestUtils { replicaAssignment: Map[Int, Seq[Int]] ): Unit = { val javaAssignment = new java.util.HashMap[Integer, java.util.List[Integer]]() - replicaAssignment.forKeyValue { (partitionId, assignment) => + replicaAssignment.foreachEntry { (partitionId, assignment) => javaAssignment.put(partitionId, assignment.map(Int.box).asJava) } val newTopic = new NewTopic(topic, javaAssignment) diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala index 66971c426f7..97a4b5d7019 100644 --- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala @@ -24,7 +24,6 @@ import java.util.{Collections, Properties} import com.yammer.metrics.core.Meter import kafka.network.Processor.ListenerMetricTag import kafka.server.KafkaConfig -import kafka.utils.Implicits.MapExtensionMethods import kafka.utils.TestUtils import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.metrics.internals.MetricsUtils @@ -760,7 +759,7 @@ class ConnectionQuotasTest { "Expected broker-connection-accept-rate metric to exist") // add listeners and verify connection limits not exceeded - listeners.forKeyValue { (name, listener) => + listeners.foreachEntry { (name, listener) => val listenerName = listener.listenerName connectionQuotas.addListener(config, listenerName) connectionQuotas.maxConnectionsPerListener(listenerName).configure(listenerConfig) @@ -785,14 +784,14 @@ class ConnectionQuotasTest { } private def verifyNoBlockedPercentRecordedOnAllListeners(): Unit = { - blockedPercentMeters.forKeyValue { (name, meter) => + blockedPercentMeters.foreachEntry { (name, meter) => assertEquals(0, meter.count(), s"BlockedPercentMeter metric for $name listener") } } private def verifyNonZeroBlockedPercentAndThrottleTimeOnAllListeners(): Unit = { - blockedPercentMeters.forKeyValue { (name, meter) => + blockedPercentMeters.foreachEntry { (name, meter) => assertTrue(meter.count() > 0, s"Expected BlockedPercentMeter metric for $name listener to be recorded") } @@ -808,7 +807,7 @@ class ConnectionQuotasTest { } private def verifyOnlyNonInterBrokerListenersBlockedPercentRecorded(): Unit = { - blockedPercentMeters.forKeyValue { (name, meter) => + blockedPercentMeters.foreachEntry { (name, meter) => name match { case "REPLICATION" => assertEquals(0, meter.count(), s"BlockedPercentMeter metric for $name listener") diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala index 45086961342..b3f514f9629 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala @@ -19,7 +19,6 @@ package kafka.server import com.yammer.metrics.core.Gauge import kafka.cluster.BrokerEndPoint import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} -import kafka.utils.Implicits.MapExtensionMethods import kafka.utils.TestUtils import org.apache.kafka.common.message.FetchResponseData.PartitionData import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset @@ -255,7 +254,7 @@ class AbstractFetcherManagerTest { fetcherManager.resizeThreadPool(newFetcherSize) val ownedPartitions = mutable.Set.empty[TopicPartition] - fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, fetcherThread) => + fetcherManager.fetcherThreadMap.foreachEntry { (brokerIdAndFetcherId, fetcherThread) => val fetcherId = brokerIdAndFetcherId.fetcherId val brokerId = brokerIdAndFetcherId.brokerId diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala index 3f27059ce8b..20d9b70a1ae 100644 --- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala @@ -18,7 +18,6 @@ package kafka.server import com.yammer.metrics.core.{Histogram, Meter} -import kafka.utils.Implicits.MapExtensionMethods import kafka.utils.TestUtils import org.apache.kafka.clients.{ClientResponse, NetworkClient} import org.apache.kafka.common.errors.{AuthenticationException, SaslAuthenticationException, UnsupportedVersionException} @@ -92,7 +91,7 @@ class AddPartitionsToTxnManagerTest { } private def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { - callbackErrors.forKeyValue(errors.put) + callbackErrors.foreachEntry(errors.put) } @Test diff --git a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala index d2d16718e6f..0d63f872e9d 100644 --- a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala +++ b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala @@ -20,7 +20,6 @@ package kafka.server import kafka.cluster.BrokerEndPoint import kafka.server.AbstractFetcherThread.ReplicaFetch import kafka.server.AbstractFetcherThread.ResultWithPartitions -import kafka.utils.Implicits.MapExtensionMethods import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -139,7 +138,7 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, ho override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]() - partitions.forKeyValue { (partition, epochData) => + partitions.foreachEntry { (partition, epochData) => assert(partition.partition == epochData.partition, "Partition must be consistent between TopicPartition and EpochData") val leaderState = leaderPartitionState(partition) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 81fc5ca422b..661a638aa88 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -1314,7 +1314,6 @@ class ReplicaFetcherThreadTest { when(partition.localLogOrException).thenReturn(log) when(partition.appendRecordsToFollowerOrFutureReplica(any[MemoryRecords], any[Boolean])).thenReturn(appendInfo) - // In Scala 2.12, the partitionsWithNewHighWatermark buffer is cleared before the replicaManager mock is verified. // Capture the argument at the time of invocation. val completeDelayedFetchRequestsArgument = mutable.Buffer.empty[TopicPartition] val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index cb73a53c3d2..0bb1c9661cb 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -19,7 +19,6 @@ package kafka.server.epoch import kafka.cluster.BrokerEndPoint import kafka.server.KafkaConfig._ import kafka.server._ -import kafka.utils.Implicits._ import kafka.utils.TestUtils._ import kafka.utils.{Logging, TestUtils} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} @@ -311,7 +310,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging { def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { val topics = new OffsetForLeaderTopicCollection(partitions.size) - partitions.forKeyValue { (topicPartition, leaderEpoch) => + partitions.foreachEntry { (topicPartition, leaderEpoch) => var topic = topics.find(topicPartition.topic) if (topic == null) { topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 326bb1ed918..15ed5c2c17d 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -421,7 +421,7 @@ object TestUtils extends Logging { topic, numPartitions, replicationFactor.toShort).configs(configsMap))) } else { val assignment = new util.HashMap[Integer, util.List[Integer]]() - replicaAssignment.forKeyValue { case (k, v) => + replicaAssignment.foreachEntry { case (k, v) => val replicas = new util.ArrayList[Integer] v.foreach(r => replicas.add(r.asInstanceOf[Integer])) assignment.put(k.asInstanceOf[Integer], replicas) diff --git a/docs/upgrade.html b/docs/upgrade.html index 96fb97fda8f..4c81bf00d46 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -86,6 +86,10 @@
  • Other changes:
      +
    • + Scala 2.12 support has been removed in Apache Kafka 4.0 + See KIP-751 for more details +
    • The --delete-config option in the kafka-topics command line tool has been deprecated.
    diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 8bb417a7362..45248cc7e28 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -20,29 +20,22 @@ ext { versions = [:] libs = [:] - - // Available if -PscalaVersion is used. This is useful when we want to support a Scala version that has - // a higher minimum Java requirement than Kafka. This was previously the case for Scala 2.12 and Java 7. - availableScalaVersions = [ '2.12', '2.13' ] } // Add Scala version -def defaultScala212Version = '2.12.19' def defaultScala213Version = '2.13.15' if (hasProperty('scalaVersion')) { - if (scalaVersion == '2.12') { - versions["scala"] = defaultScala212Version - } else if (scalaVersion == '2.13') { + if (scalaVersion == '2.13') { versions["scala"] = defaultScala213Version } else { versions["scala"] = scalaVersion } } else { - versions["scala"] = defaultScala212Version + versions["scala"] = defaultScala213Version } /* Resolve base Scala version according to these patterns: - 1. generally available Scala versions (such as: 2.12.y and 2.13.z) corresponding base versions will be: 2.12 and 2.13 (respectively) + 1. generally available Scala versions (such as: 2.13.z) corresponding base versions will be: 2.13 (respectively) 2. pre-release Scala versions (i.e. milestone/rc, such as: 2.13.0-M5, 2.13.0-RC1, 2.14.0-M1, etc.) will have identical base versions; rationale: pre-release Scala versions are not binary compatible with each other and that's the reason why libraries include the full Scala release string in their name for pre-releases (see dependencies below with an artifact name suffix '_$versions.baseScala') @@ -54,13 +47,9 @@ if ( !versions.scala.contains('-') ) { } // mockito >= 5.5 is required for Java 21 and mockito 5.x requires at least Java 11 -// mockito 4.9 is required for Scala 2.12 as a workaround for compiler errors due to ambiguous reference to `Mockito.spy` -// since Scala 2.12 support is going away soon, this is simpler than adjusting the code. // mockito 4.11 is used with Java 8 and Scala 2.13 String mockitoVersion -if (scalaVersion == "2.12") - mockitoVersion = "4.9.0" -else if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_11)) +if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_11)) mockitoVersion = "5.10.0" else mockitoVersion = "4.11.0" @@ -146,7 +135,6 @@ versions += [ pcollections: "4.0.1", reload4j: "1.2.25", rocksDB: "7.9.2", - scalaCollectionCompat: "2.10.0", // When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now // has the version field as mandatory in its configuration, see // https://github.com/scalameta/scalafmt/releases/tag/v3.1.0. @@ -246,7 +234,6 @@ libs += [ opentelemetryProto: "io.opentelemetry.proto:opentelemetry-proto:$versions.opentelemetryProto", reload4j: "ch.qos.reload4j:reload4j:$versions.reload4j", rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB", - scalaCollectionCompat: "org.scala-lang.modules:scala-collection-compat_$versions.baseScala:$versions.scalaCollectionCompat", scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat", scalaLibrary: "org.scala-lang:scala-library:$versions.scala", scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging", diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index bf540c4d7aa..e38ed4e21bf 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -184,68 +184,12 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/gradlewAll b/gradlewAll index c09e9b4a485..21d6d7b7525 100755 --- a/gradlewAll +++ b/gradlewAll @@ -17,5 +17,9 @@ # Convenient way to invoke a gradle command with all Scala versions supported # by default -./gradlew "$@" -PscalaVersion=2.12 && ./gradlew "$@" -PscalaVersion=2.13 +# This script was originally designed to support multiple Scala versions (2.12 and 2.13), +# but as Scala 2.12 is no longer supported, this script is no longer necessary. +# We are keeping it for backwards compatibility. It will be removed in a future release. +echo "Warning: This script is deprecated and will be removed in a future release." +./gradlew "$@" -PscalaVersion=2.13 diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java index 776ca9b5816..7d7716cae2e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java @@ -189,4 +189,23 @@ public class ConcurrentMapBenchmark { } } } + + @Benchmark + @OperationsPerInvocation(TIMES) + public void testConcurrentHashMapComputeIfAbsentReadOnly(Blackhole blackhole) { + for (int i = 0; i < TIMES; i++) { + blackhole.consume(concurrentHashMap.computeIfAbsent( + ThreadLocalRandom.current().nextInt(0, mapSize), + newValue -> Integer.MAX_VALUE + )); + } + } + + @Benchmark + @OperationsPerInvocation(TIMES) + public void testConcurrentHashMapGetReadOnly(Blackhole blackhole) { + for (int i = 0; i < TIMES; i++) { + blackhole.consume(concurrentHashMap.get(ThreadLocalRandom.current().nextInt(0, mapSize))); + } + } } diff --git a/release/release.py b/release/release.py index 7710789bba8..67fc46c831d 100644 --- a/release/release.py +++ b/release/release.py @@ -326,7 +326,7 @@ except Exception as e: git.targz(rc_tag, f"kafka-{release_version}-src/", f"{artifacts_dir}/kafka-{release_version}-src.tgz") -cmd("Building artifacts", "./gradlew clean && ./gradlewAll releaseTarGz", cwd=kafka_dir, env=jdk8_env, shell=True) +cmd("Building artifacts", "./gradlew clean && ./gradlew releaseTarGz -PscalaVersion=2.13", cwd=kafka_dir, env=jdk8_env, shell=True) cmd("Copying artifacts", f"cp {kafka_dir}/core/build/distributions/* {artifacts_dir}", shell=True) cmd("Building docs", "./gradlew clean aggregatedJavadoc", cwd=kafka_dir, env=jdk17_env) cmd("Copying docs", f"cp -R {kafka_dir}/build/docs/javadoc {artifacts_dir}") @@ -351,7 +351,7 @@ cmd("Zipping artifacts", f"tar -czf {artifact_name}.tar.gz {artifact_name}", cwd sftp.upload_artifacts(apache_id, artifacts_dir) confirm_or_fail("Going to build and upload mvn artifacts based on these settings:\n" + textfiles.read(global_gradle_props) + '\nOK?') -cmd("Building and uploading archives", "./gradlewAll publish", cwd=kafka_dir, env=jdk8_env, shell=True) +cmd("Building and uploading archives", "./gradlew publish -PscalaVersion=2.13", cwd=kafka_dir, env=jdk8_env, shell=True) cmd("Building and uploading archives", "mvn deploy -Pgpg-signing", cwd=os.path.join(kafka_dir, "streams/quickstart"), env=jdk8_env, shell=True) # TODO: Many of these suggested validation steps could be automated diff --git a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java index b9cbb9f3a76..08b12ff442a 100644 --- a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java +++ b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java @@ -77,7 +77,6 @@ import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LIS import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIRS_CONFIG; -@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility public class KafkaClusterTestKit implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);