From 582bb48e8863523818b9d3a48031a3fc705e8914 Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Tue, 15 Oct 2024 13:34:21 +0800 Subject: [PATCH] KAFKA-17748 Remove scala-java8-compat (#17497) Reviewers: Chia-Ping Tsai --- LICENSE-binary | 1 - build.gradle | 4 ---- .../log/remote/RemoteLogOffsetReader.java | 2 +- .../server/builders/KafkaApisBuilder.java | 3 ++- .../builders/ReplicaManagerBuilder.java | 3 ++- .../src/main/scala/kafka/cluster/Broker.scala | 4 ++-- .../transaction/ProducerIdManager.scala | 4 ++-- core/src/main/scala/kafka/log/LocalLog.scala | 4 ++-- .../src/main/scala/kafka/log/UnifiedLog.scala | 16 +++++++-------- .../scala/kafka/network/RequestChannel.scala | 4 ++-- .../scala/kafka/raft/KafkaMetadataLog.scala | 20 +++++++++---------- .../kafka/server/AbstractFetcherThread.scala | 6 +++--- .../src/main/scala/kafka/server/AclApis.scala | 4 ++-- .../kafka/server/AlterPartitionManager.scala | 4 ++-- .../server/AutoTopicCreationManager.scala | 4 ++-- .../scala/kafka/server/BrokerServer.scala | 6 +++--- .../scala/kafka/server/ConfigHelper.scala | 6 +++--- .../scala/kafka/server/ControllerServer.scala | 8 ++++---- .../scala/kafka/server/EnvelopeUtils.scala | 5 +++-- .../kafka/server/ForwardingManager.scala | 6 +++--- .../main/scala/kafka/server/KafkaConfig.scala | 6 +++--- .../main/scala/kafka/server/KafkaServer.scala | 4 ++-- .../kafka/server/LocalLeaderEndPoint.scala | 4 ++-- .../NodeToControllerChannelManager.scala | 6 +++--- .../kafka/server/RemoteLeaderEndPoint.scala | 6 +++--- .../scala/kafka/server/ReplicaManager.scala | 12 +++++------ .../metadata/ClientQuotaMetadataManager.scala | 7 ++++--- .../server/metadata/KRaftMetadataCache.scala | 10 +++++----- .../kafka/api/BaseAdminIntegrationTest.scala | 6 +++--- .../kafka/api/BaseProducerSendTest.scala | 2 +- .../DescribeAuthorizedOperationsTest.scala | 3 ++- .../kafka/api/IntegrationTestHarness.scala | 2 +- .../api/PlaintextAdminIntegrationTest.scala | 8 ++++---- ...aslClientsWithInvalidCredentialsTest.scala | 4 +++- ...aslPlainSslEndToEndAuthorizationTest.scala | 2 +- .../integration/kafka/api/SaslSetup.scala | 2 +- .../api/SaslSslAdminIntegrationTest.scala | 4 ++-- .../kafka/api/SslAdminIntegrationTest.scala | 2 +- .../api/SslEndToEndAuthorizationTest.scala | 2 +- .../kafka/server/QuorumTestHarness.scala | 4 ++-- .../kafka/cluster/PartitionLockTest.scala | 4 ++-- .../unit/kafka/cluster/PartitionTest.scala | 4 ++-- .../kafka/log/LogCleanerManagerTest.scala | 4 ++-- .../scala/unit/kafka/log/LogCleanerTest.scala | 4 ++-- .../scala/unit/kafka/log/LogLoaderTest.scala | 20 +++++++++---------- .../scala/unit/kafka/log/UnifiedLogTest.scala | 12 +++++------ .../kafka/network/RequestChannelTest.scala | 6 +++--- .../network/RequestConvertToJsonTest.scala | 8 ++++---- .../AbstractApiVersionsRequestTest.scala | 8 ++++---- ...legationTokenRequestsOnPlainTextTest.scala | 2 +- .../server/DelegationTokenRequestsTest.scala | 2 +- ...nRequestsWithDisableTokenFeatureTest.scala | 2 +- .../kafka/server/MockLeaderEndPoint.scala | 6 +++--- .../kafka/server/ReplicaManagerTest.scala | 6 +++--- .../unit/kafka/utils/SchedulerTest.scala | 5 +++-- .../scala/unit/kafka/utils/TestUtils.scala | 2 +- gradle/dependencies.gradle | 2 -- .../PartitionMakeFollowerBenchmark.java | 2 +- .../UpdateFollowerFetchStateBenchmark.java | 2 +- .../api/RaftClusterInvocationContext.java | 3 ++- 60 files changed, 158 insertions(+), 156 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index d7f14c8ccb5..ad13020513d 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -259,7 +259,6 @@ rocksdbjni-7.9.2 scala-library-2.13.15 scala-logging_2.13-3.9.5 scala-reflect-2.13.15 -scala-java8-compat_2.13-1.0.2 snappy-java-1.1.10.5 swagger-annotations-2.2.8 zookeeper-3.8.4 diff --git a/build.gradle b/build.gradle index e1157468bb0..ca1fbceb6ac 100644 --- a/build.gradle +++ b/build.gradle @@ -1054,7 +1054,6 @@ project(':core') { implementation libs.joptSimple implementation libs.jose4j implementation libs.metrics - implementation libs.scalaJava8Compat // only needed transitively, but set it explicitly to ensure it has the same version as scala-library implementation libs.scalaReflect implementation libs.scalaLogging @@ -1549,7 +1548,6 @@ project(':test-common') { } dependencies { - implementation libs.scalaJava8Compat implementation project(':core') implementation project(':metadata') implementation project(':server') @@ -1592,7 +1590,6 @@ project(':test-common:test-common-api') { dependencies { - implementation libs.scalaJava8Compat implementation project(':clients') implementation project(':core') implementation project(':group-coordinator') @@ -3300,7 +3297,6 @@ project(':jmh-benchmarks') { implementation libs.mockitoCore implementation libs.slf4jReload4j implementation libs.scalaLibrary - implementation libs.scalaJava8Compat } tasks.withType(JavaCompile) { diff --git a/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java b/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java index e82a50e9942..09b2c6dccfa 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java @@ -28,7 +28,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; import scala.Option; -import scala.compat.java8.OptionConverters; +import scala.jdk.javaapi.OptionConverters; import scala.util.Either; import scala.util.Left; import scala.util.Right; diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index c4bc16ed24a..75dc0d7dc9b 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -43,7 +43,8 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import java.util.Collections; import java.util.Optional; -import scala.compat.java8.OptionConverters; +import scala.jdk.javaapi.OptionConverters; + public class KafkaApisBuilder { diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index a694aa2a75f..d52dd2bf711 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -46,7 +46,8 @@ import java.util.Collections; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; -import scala.compat.java8.OptionConverters; +import scala.jdk.javaapi.OptionConverters; + public class ReplicaManagerBuilder { diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index e5835201fa3..9867636a7cc 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -29,8 +29,8 @@ import org.apache.kafka.metadata.{BrokerRegistration, VersionRange} import org.apache.kafka.server.authorizer.AuthorizerServerInfo import scala.collection.Seq -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOptional object Broker { private[kafka] case class ServerInfo(clusterResource: ClusterResource, @@ -58,7 +58,7 @@ object Broker { new Broker( registration.id(), registration.listeners().values().asScala.map(EndPoint.fromJava).toSeq, - registration.rack().asScala, + registration.rack().toScala, Features.supportedFeatures(supportedFeatures(registration.supportedFeatures())) ) } diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala index 322dccd0dcb..f7470b563b3 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala @@ -29,7 +29,7 @@ import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToContro import org.apache.kafka.server.common.ProducerIdsBlock import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} -import scala.compat.java8.OptionConverters.RichOptionalGeneric +import scala.jdk.OptionConverters.RichOptional import scala.util.{Failure, Success, Try} /** @@ -180,7 +180,7 @@ class RPCProducerIdManager(brokerId: Int, var result: Try[Long] = null var iteration = 0 while (result == null) { - currentProducerIdBlock.get.claimNextId().asScala match { + currentProducerIdBlock.get.claimNextId().toScala match { case None => // Check the next block if current block is full val block = nextProducerIdBlock.getAndSet(null) diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index 7a5ad087ecd..09aaed13d68 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -35,8 +35,8 @@ import java.util.regex.Pattern import java.util.{Collections, Optional} import scala.collection.mutable.ListBuffer import scala.collection.Seq -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOptional /** * An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset. @@ -415,7 +415,7 @@ class LocalLog(@volatile private var _dir: File, val startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, fetchInfo.fetchOffsetMetadata.relativePositionInSegment) val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).orElse( - segments.higherSegment(segment.baseOffset).asScala.map(s => s.baseOffset).getOrElse(logEndOffset)) + segments.higherSegment(segment.baseOffset).toScala.map(s => s.baseOffset).getOrElse(logEndOffset)) val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction] def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 1a747530ff4..4f9bf09ae13 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -41,7 +41,7 @@ import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog => JLocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, UnifiedLog => JUnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard, LocalLog => JLocalLog, UnifiedLog => JUnifiedLog} import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import java.io.{File, IOException} @@ -54,8 +54,8 @@ import java.util.{Collections, Optional, OptionalInt, OptionalLong} import scala.annotation.nowarn import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.{Seq, mutable} -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.{RichOption, RichOptional, RichOptionalInt} /** * A log which presents a unified view of local and tiered log segments. @@ -943,7 +943,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } } - def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch.asScala) + def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch.toScala) def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = { leaderEpochCache.flatMap { cache => @@ -959,7 +959,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def maybeIncrementFirstUnstableOffset(): Unit = lock synchronized { localLog.checkIfMemoryMappedBufferClosed() - val updatedFirstUnstableOffset = producerStateManager.firstUnstableOffset.asScala match { + val updatedFirstUnstableOffset = producerStateManager.firstUnstableOffset.toScala match { case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset => val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset) Some(maybeConvertToOffsetMetadata(offset)) @@ -1282,7 +1282,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // The first cached epoch usually corresponds to the log start offset, but we have to verify this since // it may not be true following a message format version bump as the epoch will not be available for // log entries written in the older format. - val earliestEpochEntry = leaderEpochCache.asJava.flatMap(_.earliestEntry()) + val earliestEpochEntry = leaderEpochCache.toJava.flatMap(_.earliestEntry()) val epochOpt = if (earliestEpochEntry.isPresent && earliestEpochEntry.get().startOffset <= logStartOffset) { Optional.of[Integer](earliestEpochEntry.get().epoch) } else Optional.empty[Integer]() @@ -1340,7 +1340,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) val timestampAndOffsetOpt = latestTimestampSegment.log.batchesFrom(position.position).asScala .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) - .flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, + .flatMap(batch => batch.offsetOfMaxTimestamp().toScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0)))) OffsetResultHolder(timestampAndOffsetOpt) } else { @@ -1376,7 +1376,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.asScala.toBuffer val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp) - targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, startOffset).asScala) + targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, startOffset).toScala) } def legacyFetchOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = { @@ -2051,7 +2051,7 @@ object UnifiedLog extends Logging { segments, logStartOffset, recoveryPoint, - leaderEpochCache.asJava, + leaderEpochCache.toJava, producerStateManager, numRemainingSegments, isRemoteLogEnabled, diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index fc215adb3aa..c48811439e2 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -38,7 +38,7 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.network.RequestConvertToJson import scala.jdk.CollectionConverters._ -import scala.compat.java8.OptionConverters._ +import scala.jdk.OptionConverters.RichOption import scala.reflect.ClassTag object RequestChannel extends Logging { @@ -250,7 +250,7 @@ object RequestChannel extends Logging { recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos)) if (isRequestLoggingEnabled) { - val desc = RequestConvertToJson.requestDescMetrics(header, requestLog.asJava, response.responseLog.asJava, + val desc = RequestConvertToJson.requestDescMetrics(header, requestLog.toJava, response.responseLog.toJava, context, session, isForwarded, totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs, diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 89a16ea91fd..c1d8b4abc8e 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -49,7 +49,7 @@ import java.nio.file.{Files, NoSuchFileException, Path} import java.util.{Optional, Properties} import scala.annotation.nowarn import scala.collection.mutable -import scala.compat.java8.OptionConverters._ +import scala.jdk.OptionConverters.{RichOption, RichOptional} final class KafkaMetadataLog private ( val log: UnifiedLog, @@ -134,7 +134,7 @@ final class KafkaMetadataLog private ( } override def endOffsetForEpoch(epoch: Int): OffsetAndEpoch = { - (log.endOffsetForEpoch(epoch), earliestSnapshotId().asScala) match { + (log.endOffsetForEpoch(epoch), earliestSnapshotId().toScala) match { case (Some(offsetAndEpoch), Some(snapshotId)) if ( offsetAndEpoch.offset == snapshotId.offset && offsetAndEpoch.leaderEpoch == epoch) => @@ -176,7 +176,7 @@ final class KafkaMetadataLog private ( override def truncateToLatestSnapshot(): Boolean = { val latestEpoch = log.latestEpoch.getOrElse(0) - val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match { + val (truncated, forgottenSnapshots) = latestSnapshotId().toScala match { case Some(snapshotId) if ( snapshotId.epoch > latestEpoch || (snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset) @@ -202,7 +202,7 @@ final class KafkaMetadataLog private ( override def updateHighWatermark(offsetMetadata: LogOffsetMetadata): Unit = { // This API returns the new high watermark, which may be different from the passed offset - val logHighWatermark = offsetMetadata.metadata.asScala match { + val logHighWatermark = offsetMetadata.metadata.toScala match { case Some(segmentPosition: SegmentPosition) => log.updateHighWatermark( new internals.log.LogOffsetMetadata( @@ -317,7 +317,7 @@ final class KafkaMetadataLog private ( value } - reader.asJava.asInstanceOf[Optional[RawSnapshotReader]] + reader.toJava.asInstanceOf[Optional[RawSnapshotReader]] } } @@ -329,13 +329,13 @@ final class KafkaMetadataLog private ( override def latestSnapshotId(): Optional[OffsetAndEpoch] = { snapshots synchronized { - snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava + snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.toJava } } override def earliestSnapshotId(): Optional[OffsetAndEpoch] = { snapshots synchronized { - snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava + snapshots.headOption.map { case (snapshotId, _) => snapshotId }.toJava } } @@ -363,7 +363,7 @@ final class KafkaMetadataLog private ( private def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch, reason: SnapshotDeletionReason): Boolean = { val (deleted, forgottenSnapshots) = snapshots synchronized { - latestSnapshotId().asScala match { + latestSnapshotId().toScala match { case Some(latestSnapshotId) if snapshots.contains(snapshotId) && startOffset < snapshotId.offset && @@ -387,7 +387,7 @@ final class KafkaMetadataLog private ( */ private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = { snapshots.keys.toSeq.flatMap { - snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())} + snapshotId => readSnapshot(snapshotId).toScala.map { reader => (snapshotId, reader.sizeInBytes())} } } @@ -395,7 +395,7 @@ final class KafkaMetadataLog private ( * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records */ private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = { - readSnapshot(snapshotId).asScala.map { reader => + readSnapshot(snapshotId).toScala.map { reader => Snapshots.lastContainedLogTimestamp(reader) } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 73c89a94399..061e7b1171d 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -44,8 +44,8 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.ReentrantLock import scala.collection.{Map, Set, mutable} -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.{RichOption, RichOptionalInt} import scala.math._ /** @@ -266,7 +266,7 @@ abstract class AbstractFetcherThread(name: String, case Errors.FENCED_LEADER_EPOCH => val currentLeaderEpoch = latestEpochsForPartitions.get(tp) - .map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).asJava + .map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).toJava if (onPartitionFenced(tp, currentLeaderEpoch)) partitionsWithError += tp @@ -365,7 +365,7 @@ abstract class AbstractFetcherThread(name: String, // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data if ((validBytes > 0 || currentFetchState.lag.isEmpty) && partitionStates.contains(topicPartition)) { val lastFetchedEpoch = - if (logAppendInfo.lastLeaderEpoch.isPresent) logAppendInfo.lastLeaderEpoch.asScala else currentFetchState.lastFetchedEpoch + if (logAppendInfo.lastLeaderEpoch.isPresent) logAppendInfo.lastLeaderEpoch.toScala else currentFetchState.lastFetchedEpoch // Update partitionStates only if there is no exception during processPartitionData val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching, lastFetchedEpoch) diff --git a/core/src/main/scala/kafka/server/AclApis.scala b/core/src/main/scala/kafka/server/AclApis.scala index 450d9815022..fe4adf5f937 100644 --- a/core/src/main/scala/kafka/server/AclApis.scala +++ b/core/src/main/scala/kafka/server/AclApis.scala @@ -36,8 +36,8 @@ import java.util import java.util.concurrent.CompletableFuture import scala.collection.mutable.ArrayBuffer import scala.collection.mutable -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOptional /** * Logic to handle ACL requests. @@ -112,7 +112,7 @@ class AclApis(authHelper: AuthHelper, val aclCreationResults = allBindings.map { acl => val result = errorResults.getOrElse(acl, createResults(validBindings.indexOf(acl)).get) val creationResult = new AclCreationResult() - result.exception.asScala.foreach { throwable => + result.exception.toScala.foreach { throwable => val apiError = ApiError.fromThrowable(throwable) creationResult .setErrorCode(apiError.error.code) diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala index 5abfde2bc8d..016a7a330fc 100644 --- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala +++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala @@ -39,7 +39,7 @@ import org.apache.kafka.server.util.Scheduler import scala.collection.mutable import scala.collection.mutable.ListBuffer -import scala.compat.java8.OptionConverters._ +import scala.jdk.OptionConverters.RichOptional /** * Handles updating the ISR by sending AlterPartition requests to the controller (as of 2.7) or by updating ZK directly @@ -328,7 +328,7 @@ class DefaultAlterPartitionManager( val apiError = Errors.forCode(partition.errorCode) debug(s"Controller successfully handled AlterPartition request for $tp: $partition") if (apiError == Errors.NONE) { - LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).asScala match { + LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).toScala match { case Some(leaderRecoveryState) => partitionResponses(tp) = Right( new LeaderAndIsr( diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index f7f593badd9..b7e7439e55b 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -37,8 +37,8 @@ import org.apache.kafka.coordinator.share.ShareCoordinator import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} import scala.collection.{Map, Seq, Set, mutable} -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOptional trait AutoTopicCreationManager { @@ -195,7 +195,7 @@ class DefaultAutoTopicCreationManager( val request = metadataRequestContext.map { context => val requestVersion = - channelManager.controllerApiVersions.asScala match { + channelManager.controllerApiVersions.toScala match { case None => // We will rely on the Metadata request to be retried in the case // that the latest version is not usable by the controller. diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 40c4930f84a..e36dd81a407 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -64,8 +64,8 @@ import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.{Condition, ReentrantLock} import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException} import scala.collection.Map -import scala.compat.java8.OptionConverters.RichOptionForJava8 import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption /** @@ -576,7 +576,7 @@ class BrokerServer( // authorizer future is completed. val endpointReadyFutures = { val builder = new EndpointReadyFutures.Builder() - builder.build(authorizer.asJava, + builder.build(authorizer.toJava, new KafkaAuthorizerServerInfo( new ClusterResource(clusterId), config.nodeId, @@ -679,7 +679,7 @@ class BrokerServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, - (tp: TopicPartition) => logManager.getLog(tp).asJava, + (tp: TopicPartition) => logManager.getLog(tp).toJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset) diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 339b81865cc..095a474441a 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -39,8 +39,8 @@ import org.apache.kafka.storage.internals.log.LogConfig import scala.collection.mutable.ListBuffer import scala.collection.{Map, mutable} -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOptional class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepository: ConfigRepository) extends Logging { @@ -188,7 +188,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean) (name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = { val allNames = brokerSynonyms(name) - val configEntryType = GroupConfig.configType(name).asScala + val configEntryType = GroupConfig.configType(name).toScala val isSensitive = KafkaConfig.maybeSensitive(configEntryType) val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull) val allSynonyms = { @@ -212,7 +212,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean) (name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = { - val configEntryType = LogConfig.configType(name).asScala + val configEntryType = LogConfig.configType(name).toScala val isSensitive = KafkaConfig.maybeSensitive(configEntryType) val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull) val allSynonyms = { diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index c03a0b95e61..f1eba316a8c 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -53,8 +53,8 @@ import java.util import java.util.{Optional, OptionalLong} import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{CompletableFuture, TimeUnit} -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption /** @@ -172,7 +172,7 @@ class ControllerServer( val endpointReadyFutures = { val builder = new EndpointReadyFutures.Builder() - builder.build(authorizer.asJava, + builder.build(authorizer.toJava, new KafkaAuthorizerServerInfo( new ClusterResource(clusterId), config.nodeId, @@ -229,8 +229,8 @@ class ControllerServer( setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs). setMaxIdleIntervalNs(maxIdleIntervalNs). setMetrics(quorumControllerMetrics). - setCreateTopicPolicy(createTopicPolicy.asJava). - setAlterConfigPolicy(alterConfigPolicy.asJava). + setCreateTopicPolicy(createTopicPolicy.toJava). + setAlterConfigPolicy(alterConfigPolicy.toJava). setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)). setStaticConfig(config.originals). setBootstrapMetadata(bootstrapMetadata). diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala b/core/src/main/scala/kafka/server/EnvelopeUtils.scala index 4988aa39b2a..3fdbf337033 100644 --- a/core/src/main/scala/kafka/server/EnvelopeUtils.scala +++ b/core/src/main/scala/kafka/server/EnvelopeUtils.scala @@ -26,7 +26,8 @@ import org.apache.kafka.common.requests.{EnvelopeRequest, RequestContext, Reques import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.network.metrics.RequestChannelMetrics -import scala.compat.java8.OptionConverters._ +import scala.jdk.OptionConverters.RichOptional + object EnvelopeUtils { def handleEnvelopeRequest( @@ -124,7 +125,7 @@ object EnvelopeUtils { envelopeContext: RequestContext, principalBytes: Array[Byte] ): KafkaPrincipal = { - envelopeContext.principalSerde.asScala match { + envelopeContext.principalSerde.toScala match { case Some(serde) => try { serde.deserialize(principalBytes) diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala b/core/src/main/scala/kafka/server/ForwardingManager.scala index 85f48f4b2dd..f9d3cd9f967 100644 --- a/core/src/main/scala/kafka/server/ForwardingManager.scala +++ b/core/src/main/scala/kafka/server/ForwardingManager.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, Enve import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} import java.util.concurrent.TimeUnit -import scala.compat.java8.OptionConverters._ +import scala.jdk.OptionConverters.RichOptional trait ForwardingManager { def close(): Unit @@ -98,7 +98,7 @@ object ForwardingManager { private[server] def buildEnvelopeRequest(context: RequestContext, forwardRequestBuffer: ByteBuffer): EnvelopeRequest.Builder = { - val principalSerde = context.principalSerde.asScala.getOrElse( + val principalSerde = context.principalSerde.toScala.getOrElse( throw new IllegalArgumentException(s"Cannot deserialize principal from request context $context " + "since there is no serde defined") ) @@ -188,7 +188,7 @@ class ForwardingManagerImpl( forwardingManagerMetrics.close() override def controllerApiVersions: Option[NodeApiVersions] = - channelManager.controllerApiVersions.asScala + channelManager.controllerApiVersions.toScala private def parseResponse( buffer: ByteBuffer, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ebc6937ef1d..336a1ac8979 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -54,9 +54,9 @@ import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion import org.apache.zookeeper.client.ZKClientConfig import scala.annotation.nowarn -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ import scala.collection.{Map, Seq} +import scala.jdk.OptionConverters.RichOptional object KafkaConfig { @@ -149,8 +149,8 @@ object KafkaConfig { def loggableValue(resourceType: ConfigResource.Type, name: String, value: String): String = { val maybeSensitive = resourceType match { case ConfigResource.Type.BROKER => KafkaConfig.maybeSensitive(KafkaConfig.configType(name)) - case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name).asScala) - case ConfigResource.Type.GROUP => KafkaConfig.maybeSensitive(GroupConfig.configType(name).asScala) + case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name).toScala) + case ConfigResource.Type.GROUP => KafkaConfig.maybeSensitive(GroupConfig.configType(name).toScala) case ConfigResource.Type.BROKER_LOGGER => false case ConfigResource.Type.CLIENT_METRICS => false case _ => true diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index bbac5a5ed30..0b8d3a1dd81 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -76,8 +76,8 @@ import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.{Optional, OptionalInt, OptionalLong} import scala.collection.{Map, Seq} -import scala.compat.java8.OptionConverters.RichOptionForJava8 import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption object KafkaServer { def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false): ZKClientConfig = { @@ -702,7 +702,7 @@ class KafkaServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, - (tp: TopicPartition) => logManager.getLog(tp).asJava, + (tp: TopicPartition) => logManager.getLog(tp).toJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset) diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala index 151af6e914b..dec1d0917d1 100644 --- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala @@ -34,8 +34,8 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPa import java.util import java.util.Optional import scala.collection.{Map, Seq, Set, mutable} -import scala.compat.java8.OptionConverters.RichOptionForJava8 import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption /** * Facilitates fetches from a local replica leader. @@ -206,7 +206,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, try { val logStartOffset = replicaManager.futureLocalLogOrException(topicPartition).logStartOffset val lastFetchedEpoch = if (isTruncationOnFetchSupported) - fetchState.lastFetchedEpoch.map(_.asInstanceOf[Integer]).asJava + fetchState.lastFetchedEpoch.map(_.asInstanceOf[Integer]).toJava else Optional.empty[Integer] val topicId = fetchState.topicId.getOrElse(Uuid.ZERO_UUID) diff --git a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala index a70b5545fee..59923056f73 100644 --- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala @@ -38,8 +38,8 @@ import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletion import java.util import java.util.Optional import scala.collection.Seq -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.{RichOption, RichOptionalInt} case class ControllerInformation( node: Option[Node], @@ -115,7 +115,7 @@ class RaftControllerNodeProvider( private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, listenerName) override def getControllerInfo(): ControllerInformation = - ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.flatMap(idToNode), + ControllerInformation(raftManager.leaderAndEpoch.leaderId.toScala.flatMap(idToNode), listenerName, securityProtocol, saslMechanism, isZkController = false) } @@ -231,7 +231,7 @@ class NodeToControllerChannelManagerImpl( def controllerApiVersions(): Optional[NodeApiVersions] = { requestThread.activeControllerAddress().flatMap { activeController => Option(apiVersions.get(activeController.idString)) - }.asJava + }.toJava } def getTimeoutMs: Long = retryTimeoutMs diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala index 3cdff49d408..76c2b07fd26 100644 --- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala @@ -30,12 +30,12 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetFo import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} -import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion} +import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2 import scala.jdk.CollectionConverters._ import scala.collection.{Map, mutable} -import scala.compat.java8.OptionConverters.RichOptionForJava8 +import scala.jdk.OptionConverters.RichOption /** * Facilitates fetches from a remote replica leader. @@ -187,7 +187,7 @@ class RemoteLeaderEndPoint(logPrefix: String, try { val logStartOffset = replicaManager.localLogOrException(topicPartition).logStartOffset val lastFetchedEpoch = if (isTruncationOnFetchSupported) - fetchState.lastFetchedEpoch.map(_.asInstanceOf[Integer]).asJava + fetchState.lastFetchedEpoch.map(_.asInstanceOf[Integer]).toJava else Optional.empty[Integer] builder.add(topicPartition, new FetchRequest.PartitionData( diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 65da75c8865..38701ba1cd3 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -73,8 +73,8 @@ import java.util.concurrent.locks.Lock import java.util.concurrent.{CompletableFuture, Future, RejectedExecutionException, TimeUnit} import java.util.{Collections, Optional, OptionalInt, OptionalLong} import scala.collection.{Map, Seq, Set, immutable, mutable} -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.{RichOption, RichOptional} /* * Result metadata of a log append operation on the log @@ -142,7 +142,7 @@ case class LogReadResult(info: FetchDataInfo, this.highWatermark, this.leaderLogStartOffset, this.info.records, - this.divergingEpoch.asJava, + this.divergingEpoch.toJava, if (this.lastStableOffset.isDefined) OptionalLong.of(this.lastStableOffset.get) else OptionalLong.empty(), this.info.abortedTransactions, if (this.preferredReadReplica.isDefined) OptionalInt.of(this.preferredReadReplica.get) else OptionalInt.empty(), @@ -1778,7 +1778,7 @@ class ReplicaManager(val config: KafkaConfig, throw new InconsistentTopicIdException("Topic ID in the fetch session did not match the topic ID in the log.") // If we are the leader, determine the preferred read-replica - val preferredReadReplica = params.clientMetadata.asScala.flatMap( + val preferredReadReplica = params.clientMetadata.toScala.flatMap( metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs)) if (preferredReadReplica.isDefined) { @@ -1813,7 +1813,7 @@ class ReplicaManager(val config: KafkaConfig, val fetchDataInfo = checkFetchDataInfo(partition, readInfo.fetchedData) LogReadResult(info = fetchDataInfo, - divergingEpoch = readInfo.divergingEpoch.asScala, + divergingEpoch = readInfo.divergingEpoch.toScala, highWatermark = readInfo.highWatermark, leaderLogStartOffset = readInfo.logStartOffset, leaderLogEndOffset = readInfo.logEndOffset, @@ -1980,7 +1980,7 @@ class ReplicaManager(val config: KafkaConfig, replicaInfoSet.add(leaderReplica) val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica) - replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect { + replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).toScala.collect { // Even though the replica selector can return the leader, we don't want to send it out with the // FetchResponse, so we exclude it here case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id @@ -3008,7 +3008,7 @@ class ReplicaManager(val config: KafkaConfig, partitionsToStartFetching.foreachEntry { (topicPartition, partition) => val nodeOpt = partition.leaderReplicaIdOpt .flatMap(leaderId => Option(newImage.cluster.broker(leaderId))) - .flatMap(_.node(listenerName).asScala) + .flatMap(_.node(listenerName).toScala) nodeOpt match { case Some(node) => diff --git a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala index 7404749d01d..7ed687da5a1 100644 --- a/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala +++ b/core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala @@ -28,7 +28,8 @@ import java.net.{InetAddress, UnknownHostException} import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta} import org.apache.kafka.server.config.{QuotaConfigs, ZooKeeperInternals} -import scala.compat.java8.OptionConverters._ +import scala.jdk.OptionConverters.RichOptionalDouble + // A strict hierarchy of entities that we support @@ -97,7 +98,7 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag } } quotaDelta.changes().forEach { (key, value) => - handleUserClientQuotaChange(userClientEntity, key, value.asScala) + handleUserClientQuotaChange(userClientEntity, key, value.toScala) } } else { warn(s"Ignoring unsupported quota entity $entity.") @@ -124,7 +125,7 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag warn(s"Ignoring unexpected quota key $quotaName for entity $ipEntity") } else { try { - connectionQuotas.updateIpConnectionRateQuota(inetAddress, quotaValue.asScala.map(_.toInt)) + connectionQuotas.updateIpConnectionRateQuota(inetAddress, quotaValue.toScala.map(_.toInt)) } catch { case t: Throwable => error(s"Failed to update IP quota $ipEntity", t) } diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index f8353df8053..e6a874c45d1 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -41,8 +41,8 @@ import java.util.function.Supplier import java.util.{Collections, Properties} import scala.collection.mutable.ListBuffer import scala.collection.{Map, Seq, Set, mutable} -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOptional import scala.util.control.Breaks._ @@ -238,7 +238,7 @@ class KRaftMetadataCache( * @return None if broker is not alive or if the broker does not have a listener named `listenerName`. */ private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Option[Node] = { - Option(image.cluster().broker(id)).flatMap(_.node(listenerName.value()).asScala) + Option(image.cluster().broker(id)).flatMap(_.node(listenerName.value()).toScala) } // errorUnavailableEndpoints exists to support v0 MetadataResponses @@ -373,12 +373,12 @@ class KRaftMetadataCache( override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = { Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()). - flatMap(_.node(listenerName.value()).asScala) + flatMap(_.node(listenerName.value()).toScala) } override def getAliveBrokerNodes(listenerName: ListenerName): Seq[Node] = { _currentImage.cluster().brokers().values().asScala.filterNot(_.fenced()). - flatMap(_.node(listenerName.value()).asScala).toSeq + flatMap(_.node(listenerName.value()).toScala).toSeq } // Does NOT include offline replica metadata @@ -478,7 +478,7 @@ class KRaftMetadataCache( val nodes = new util.HashMap[Integer, Node] image.cluster().brokers().values().forEach { broker => if (!broker.fenced()) { - broker.node(listenerName.value()).asScala.foreach { node => + broker.node(listenerName.value()).toScala.foreach { node => nodes.put(broker.id(), node) } } diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index b385f9f222a..6a877bd6361 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.resource.ResourceType import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.security.authorizer.AclEntry -import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs} +import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest @@ -38,7 +38,7 @@ import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ import scala.collection.Seq -import scala.compat.java8.OptionConverters._ +import scala.jdk.OptionConverters.RichOption /** * Base integration test cases for [[Admin]]. Each test case added here will be executed @@ -78,7 +78,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg val newTopics = Seq( new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava), new NewTopic("mytopic2", 3, 3.toShort), - new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava) + new NewTopic("mytopic3", Option.empty[Integer].toJava, Option.empty[java.lang.Short].toJava) ) val validateResult = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)) validateResult.all.get() diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 2bf6c722cb5..4aa5a5d14bc 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -41,9 +41,9 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import scala.collection.mutable -import scala.compat.java8.OptionConverters import scala.concurrent.ExecutionException import scala.jdk.CollectionConverters._ +import scala.jdk.javaapi.OptionConverters abstract class BaseProducerSendTest extends KafkaServerTestHarness { diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala index 7a778a12e03..bba927f1068 100644 --- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala +++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala @@ -13,6 +13,7 @@ package kafka.api import kafka.security.JaasTestUtils + import java.util import java.util.Properties import kafka.security.authorizer.AclAuthorizer @@ -30,8 +31,8 @@ import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} -import scala.compat.java8.OptionConverters import scala.jdk.CollectionConverters._ +import scala.jdk.javaapi.OptionConverters object DescribeAuthorizedOperationsTest { val Group1 = "group1" diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 8f6271c8e5c..7de99f5a2d2 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -37,7 +37,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import scala.collection.mutable import scala.collection.Seq -import scala.compat.java8.OptionConverters +import scala.jdk.javaapi.OptionConverters /** * A helper class for writing integration tests that involve producers, consumers, and servers diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 2040994be52..a107d46cedb 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -41,7 +41,7 @@ import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfig import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter} -import org.apache.kafka.common.requests.{DeleteRecordsRequest} +import org.apache.kafka.common.requests.DeleteRecordsRequest import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.kafka.common.utils.{Time, Utils} @@ -62,10 +62,10 @@ import org.slf4j.LoggerFactory import java.util.AbstractMap.SimpleImmutableEntry import scala.annotation.nowarn import scala.collection.Seq -import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption import scala.util.{Random, Using} /** @@ -653,7 +653,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val newTopics = Seq( new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava), new NewTopic("mytopic2", 3, 3.toShort), - new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava) + new NewTopic("mytopic3", Option.empty[Integer].toJava, Option.empty[java.lang.Short].toJava) ) val createResult = client.createTopics(newTopics.asJava) createResult.all.get() @@ -3393,7 +3393,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { (1: Integer) -> Seq[Integer](2, 0).asJava).asJava). configs(Collections.singletonMap(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "9999999")), new NewTopic("bar", 3, 3.toShort), - new NewTopic("baz", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava) + new NewTopic("baz", Option.empty[Integer].toJava, Option.empty[java.lang.Short].toJava) ) val result = client.createTopics(newTopics.asJava) result.all.get() diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index d99c8d41640..d4e05a065db 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -13,6 +13,7 @@ package kafka.api import kafka.security.JaasTestUtils + import java.time.Duration import java.util.{Collections, Properties} import java.util.concurrent.{ExecutionException, TimeUnit} @@ -32,7 +33,8 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import scala.compat.java8.OptionConverters +import scala.jdk.javaapi.OptionConverters + class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { private val kafkaClientSaslMechanism = "SCRAM-SHA-256" diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala index 5f1226adcea..944b3aa4dbe 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala @@ -37,8 +37,8 @@ import javax.security.auth.Subject import javax.security.auth.callback._ import javax.security.auth.login.AppConfigurationEntry import scala.collection.Seq -import scala.compat.java8.OptionConverters import scala.jdk.CollectionConverters._ +import scala.jdk.javaapi.OptionConverters object SaslPlainSslEndToEndAuthorizationTest { diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 25cd86cab53..a9eb15a24b2 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -39,9 +39,9 @@ import java.util import java.util.Properties import javax.security.auth.login.Configuration import scala.collection.Seq -import scala.compat.java8.OptionConverters import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ +import scala.jdk.javaapi.OptionConverters import scala.util.Using /* diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index 5b6e3fc2c24..574e9c5cdf6 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -41,9 +41,9 @@ import org.junit.jupiter.params.provider.ValueSource import java.util import java.util.Collections import scala.collection.Seq -import scala.compat.java8.OptionConverters._ import scala.concurrent.ExecutionException import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption import scala.util.{Failure, Success, Try} @Timeout(120) @@ -583,7 +583,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu val configsOverride = Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "100000").asJava val newTopics = Seq( new NewTopic(topic1, 2, 3.toShort).configs(configsOverride), - new NewTopic(topic2, Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava).configs(configsOverride)) + new NewTopic(topic2, Option.empty[Integer].toJava, Option.empty[java.lang.Short].toJava).configs(configsOverride)) val validateResult = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)) validateResult.all.get() waitForTopics(client, List(), topics) diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala index 5d365e4c05b..5063e79ad08 100644 --- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala @@ -39,7 +39,7 @@ import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ import scala.collection.{Seq, mutable} -import scala.compat.java8.OptionConverters +import scala.jdk.javaapi.OptionConverters object SslAdminIntegrationTest { @volatile var semaphore: Option[Semaphore] = None diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala index 5fc8295626e..11cbf83e178 100644 --- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Java import org.junit.jupiter.api.{BeforeEach, TestInfo} import java.util.Optional -import scala.compat.java8.OptionConverters +import scala.jdk.javaapi.OptionConverters object SslEndToEndAuthorizationTest { val superuserCn = "super-user" diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 2f8bf03903b..a0204f2ad94 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -46,8 +46,8 @@ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, T import java.nio.file.{Files, Paths} import scala.collection.Seq -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOptional trait QuorumImplementation { def createBroker( @@ -279,7 +279,7 @@ abstract class QuorumTestHarness extends Logging { tearDown() } }) - val name = testInfo.getTestMethod.asScala + val name = testInfo.getTestMethod.toScala .map(_.toString) .getOrElse("[unspecified]") if (TestInfoUtils.isKRaft(testInfo)) { diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index a4c9e7ab294..6d512c23853 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -48,9 +48,9 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers import org.mockito.Mockito.{mock, when} -import scala.compat.java8.OptionConverters._ import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption /** * Verifies that slow appends to log don't block request threads processing replica fetch requests. @@ -324,7 +324,7 @@ class PartitionLockTest extends Logging { segments, 0L, 0L, - leaderEpochCache.asJava, + leaderEpochCache.toJava, producerStateManager, new ConcurrentHashMap[String, Integer], false diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 52647beb0d1..f09f4e0821d 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -67,8 +67,8 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption object PartitionTest { class MockPartitionListener extends PartitionListener { @@ -148,7 +148,7 @@ object PartitionTest { minBytes, maxBytes, isolation, - clientMetadata.asJava + clientMetadata.toJava ) } } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 1722f1d472a..68abd08f22f 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -37,7 +37,7 @@ import java.lang.{Long => JLong} import java.util import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable -import scala.compat.java8.OptionConverters._ +import scala.jdk.OptionConverters.RichOption /** * Unit tests for the log cleaning logic @@ -124,7 +124,7 @@ class LogCleanerManagerTest extends Logging { segments, 0L, 0L, - leaderEpochCache.asJava, + leaderEpochCache.toJava, producerStateManager, new ConcurrentHashMap[String, Integer], false diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 63c02224f18..9086626706c 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -45,8 +45,8 @@ import java.nio.file.Paths import java.util.Properties import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} import scala.collection._ -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption /** * Unit tests for the log cleaning logic @@ -204,7 +204,7 @@ class LogCleanerTest extends Logging { logSegments, 0L, 0L, - leaderEpochCache.asJava, + leaderEpochCache.toJava, producerStateManager, new ConcurrentHashMap[String, Integer], false diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 47e5cd52ba5..b72c1c555db 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -53,8 +53,8 @@ import java.util.{Optional, OptionalLong, Properties} import scala.annotation.nowarn import scala.collection.mutable.ListBuffer import scala.collection.{Iterable, Map, mutable} -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.{RichOption, RichOptional} class LogLoaderTest { var config: KafkaConfig = _ @@ -163,7 +163,7 @@ class LogLoaderTest { this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time) val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time, logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint, - leaderEpochCache.asJava, producerStateManager, new ConcurrentHashMap[String, Integer], false) + leaderEpochCache.toJava, producerStateManager, new ConcurrentHashMap[String, Integer], false) val offsets = logLoader.load() val localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint, offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition, @@ -386,7 +386,7 @@ class LogLoaderTest { interceptedLogSegments, 0L, recoveryPoint, - leaderEpochCache.asJava, + leaderEpochCache.toJava, producerStateManager, new ConcurrentHashMap[String, Integer], false @@ -452,7 +452,7 @@ class LogLoaderTest { segments, 0L, 0L, - leaderEpochCache.asJava, + leaderEpochCache.toJava, stateManager, new ConcurrentHashMap[String, Integer], false @@ -564,7 +564,7 @@ class LogLoaderTest { segments, 0L, 0L, - leaderEpochCache.asJava, + leaderEpochCache.toJava, stateManager, new ConcurrentHashMap[String, Integer], false @@ -621,7 +621,7 @@ class LogLoaderTest { segments, 0L, 0L, - leaderEpochCache.asJava, + leaderEpochCache.toJava, stateManager, new ConcurrentHashMap[String, Integer], false @@ -677,7 +677,7 @@ class LogLoaderTest { segments, 0L, 0L, - leaderEpochCache.asJava, + leaderEpochCache.toJava, stateManager, new ConcurrentHashMap[String, Integer], false @@ -1638,7 +1638,7 @@ class LogLoaderTest { assertEquals(9, log.activeSegment.baseOffset) assertEquals(9, log.logEndOffset) for (offset <- 1 until 10) { - val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset).asScala + val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset).toScala assertTrue(snapshotFileBeforeDeletion.isDefined) assertTrue(snapshotFileBeforeDeletion.get.file.exists) } @@ -1693,7 +1693,7 @@ class LogLoaderTest { .filter(snapshotFile => snapshotFile.file.exists()) .map(_.offset) val inMemorySnapshotFiles = (1 until 5) - .flatMap(offset => log.producerStateManager.snapshotFileForOffset(offset).asScala) + .flatMap(offset => log.producerStateManager.snapshotFileForOffset(offset).toScala) assertTrue(offsetsWithSnapshotFiles.isEmpty, s"Found offsets with producer state snapshot files: $offsetsWithSnapshotFiles while none were expected.") assertTrue(inMemorySnapshotFiles.isEmpty, s"Found in-memory producer state snapshot files: $inMemorySnapshotFiles while none were expected.") @@ -1826,7 +1826,7 @@ class LogLoaderTest { segments, 0L, 0L, - leaderEpochCache.asJava, + leaderEpochCache.toJava, stateManager, new ConcurrentHashMap[String, Integer], isRemoteLogEnabled diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 3e3d5358429..b3553c5b8ed 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -59,8 +59,8 @@ import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit} import java.util.{Optional, OptionalLong, Properties} import scala.annotation.nowarn import scala.collection.mutable.ListBuffer -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.{RichOptional, RichOptionalInt} class UnifiedLogTest { var config: KafkaConfig = _ @@ -646,14 +646,14 @@ class UnifiedLogTest { baseOffset = 27) appendAsFollower(log, records, leaderEpoch = 19) assertEquals(Some(new EpochEntry(19, 27)), - log.leaderEpochCache.flatMap(_.latestEntry.asScala)) + log.leaderEpochCache.flatMap(_.latestEntry.toScala)) assertEquals(29, log.logEndOffset) def verifyTruncationClearsEpochCache(epoch: Int, truncationOffset: Long): Unit = { // Simulate becoming a leader log.maybeAssignEpochStartOffset(leaderEpoch = epoch, startOffset = log.logEndOffset) assertEquals(Some(new EpochEntry(epoch, 29)), - log.leaderEpochCache.flatMap(_.latestEntry.asScala)) + log.leaderEpochCache.flatMap(_.latestEntry.toScala)) assertEquals(29, log.logEndOffset) // Now we become the follower and truncate to an offset greater @@ -661,7 +661,7 @@ class UnifiedLogTest { // at the end of the log should be gone log.truncateTo(truncationOffset) assertEquals(Some(new EpochEntry(19, 27)), - log.leaderEpochCache.flatMap(_.latestEntry.asScala)) + log.leaderEpochCache.flatMap(_.latestEntry.toScala)) assertEquals(29, log.logEndOffset) } @@ -2563,12 +2563,12 @@ class UnifiedLogTest { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) val log = createLog(logDir, logConfig) log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) - assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch.asScala)) + assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch.toScala)) log.appendAsFollower(TestUtils.records(List(new SimpleRecord("foo".getBytes())), baseOffset = 1L, magicValue = RecordVersion.V1.value)) - assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch.asScala)) + assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch.toScala)) } @nowarn("cat=deprecation") diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala index d01d390813b..1ab0f0ae8e8 100644 --- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala @@ -50,8 +50,8 @@ import java.nio.ByteBuffer import java.util.Collections import java.util.concurrent.atomic.AtomicReference import scala.collection.{Map, Seq} -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption class RequestChannelTest { private val requestChannelMetrics: RequestChannelMetrics = mock(classOf[RequestChannelMetrics]) @@ -71,7 +71,7 @@ class RequestChannelTest { val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest] val loggedConfig = loggableAlterConfigs.configs.get(resource) assertEquals(expectedValues, toMap(loggedConfig)) - val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString + val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.toJava, alterConfigs.isForwarded).toString assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc") } @@ -135,7 +135,7 @@ class RequestChannelTest { val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest] val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs assertEquals(expectedValues, toMap(loggedConfig)) - val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString + val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.toJava, alterConfigs.isForwarded).toString assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc") } diff --git a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala index bdc7da74ddd..40452c12854 100644 --- a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala @@ -33,7 +33,7 @@ import org.junit.jupiter.api.Test import org.mockito.Mockito.mock import java.util.Collections -import scala.compat.java8.OptionConverters._ +import scala.jdk.OptionConverters.RichOption class RequestConvertToJsonTest { @@ -76,7 +76,7 @@ class RequestConvertToJsonTest { expectedNode.set("requestHeader", RequestConvertToJson.requestHeaderNode(req.header)) expectedNode.set("request", req.requestLog.getOrElse(new TextNode(""))) - val actualNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.asJava, req.isForwarded) + val actualNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.toJava, req.isForwarded) assertEquals(expectedNode, actualNode) } @@ -99,7 +99,7 @@ class RequestConvertToJsonTest { val temporaryMemoryBytes = 8 val messageConversionsTimeMs = 9 - val expectedNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.asJava, req.isForwarded).asInstanceOf[ObjectNode] + val expectedNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.toJava, req.isForwarded).asInstanceOf[ObjectNode] expectedNode.set("response", res.responseLog.getOrElse(new TextNode(""))) expectedNode.set("connection", new TextNode(req.context.connectionId)) expectedNode.set("totalTimeMs", new DoubleNode(totalTimeMs)) @@ -116,7 +116,7 @@ class RequestConvertToJsonTest { expectedNode.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes)) expectedNode.set("messageConversionsTime", new DoubleNode(messageConversionsTimeMs)) - val actualNode = RequestConvertToJson.requestDescMetrics(req.header, req.requestLog.asJava, res.responseLog.asJava, req.context, req.session, req.isForwarded, + val actualNode = RequestConvertToJson.requestDescMetrics(req.header, req.requestLog.toJava, res.responseLog.toJava, req.context, req.session, req.isForwarded, totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs, responseSendTimeMs, temporaryMemoryBytes, messageConversionsTimeMs).asInstanceOf[ObjectNode] diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 315649e2731..05c3bc2eade 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -31,14 +31,14 @@ import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Tag -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOptional @Tag("integration") abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = { - val socket = if (cluster.controllerListenerName().asScala.contains(listenerName)) { + val socket = if (cluster.controllerListenerName().toScala.contains(listenerName)) { cluster.controllerSocketServers().asScala.head } else { cluster.brokerSocketServers().asScala.head @@ -89,7 +89,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { ApiKeys.apisForListener(ApiMessageType.ListenerType.ZK_BROKER), enableUnstableLastVersion ) - } else if (cluster.controllerListenerName().asScala.contains(listenerName)) { + } else if (cluster.controllerListenerName().toScala.contains(listenerName)) { ApiVersionsResponse.collectApis( ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER), enableUnstableLastVersion @@ -109,7 +109,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { val defaultApiVersionsResponse = if (!cluster.isKRaftTest) { TestUtils.defaultApiVersionsResponse(0, ListenerType.ZK_BROKER, enableUnstableLastVersion) - } else if (cluster.controllerListenerName().asScala.contains(listenerName)) { + } else if (cluster.controllerListenerName().toScala.contains(listenerName)) { TestUtils.defaultApiVersionsResponse(0, ListenerType.CONTROLLER, enableUnstableLastVersion) } else { TestUtils.createApiVersionsResponse(0, expectedApis) diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala index ca1d32e91b9..e8496b296db 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala @@ -26,8 +26,8 @@ import org.junit.jupiter.api.Assertions.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import scala.compat.java8.OptionConverters import scala.concurrent.ExecutionException +import scala.jdk.javaapi.OptionConverters class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest { var adminClient: Admin = _ diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala index 18f7b3f538e..f311b254841 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala @@ -30,9 +30,9 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import java.util -import scala.compat.java8.OptionConverters import scala.concurrent.ExecutionException import scala.jdk.CollectionConverters._ +import scala.jdk.javaapi.OptionConverters class DelegationTokenRequestsTest extends IntegrationTestHarness with SaslSetup { override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala index a28d784c5cb..586ab0c289d 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala @@ -27,8 +27,8 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import java.util -import scala.compat.java8.OptionConverters import scala.concurrent.ExecutionException +import scala.jdk.javaapi.OptionConverters class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest with SaslSetup { override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT diff --git a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala index 0d63f872e9d..3c1a398f55e 100644 --- a/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala +++ b/core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala @@ -32,8 +32,8 @@ import org.apache.kafka.common.{TopicPartition, Uuid} import java.nio.ByteBuffer import java.util.Optional import scala.collection.{Map, Set, mutable} -import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.{RichOption, RichOptional} import scala.util.Random class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, host = "localhost", port = Random.nextInt()), @@ -154,7 +154,7 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, ho if (state.isReadyForFetch) { val replicaState = replicaPartitionStateCallback(partition).getOrElse(throw new IllegalArgumentException(s"Unknown partition $partition")) val lastFetchedEpoch = if (isTruncationOnFetchSupported) - state.lastFetchedEpoch.map(_.asInstanceOf[Integer]).asJava + state.lastFetchedEpoch.map(_.asInstanceOf[Integer]).toJava else Optional.empty[Integer] fetchData.put(partition, @@ -204,7 +204,7 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, ho lastFetchedEpoch: Optional[Integer], fetchOffset: Long, partitionState: PartitionState): Option[FetchResponseData.EpochEndOffset] = { - lastFetchedEpoch.asScala.flatMap { fetchEpoch => + lastFetchedEpoch.toScala.flatMap { fetchEpoch => val epochEndOffset = fetchEpochEndOffsets( Map(topicPartition -> new EpochData() .setPartition(topicPartition.partition) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 1f654c09099..b7d5ca1c249 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -90,8 +90,8 @@ import java.util.concurrent.{Callable, ConcurrentHashMap, CountDownLatch, TimeUn import java.util.stream.IntStream import java.util.{Collections, Optional, OptionalLong, Properties} import scala.collection.{Map, Seq, mutable} -import scala.compat.java8.OptionConverters.RichOptionForJava8 import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption object ReplicaManagerTest { @AfterAll @@ -2938,7 +2938,7 @@ class ReplicaManagerTest { segments, 0L, 0L, - leaderEpochCache.asJava, + leaderEpochCache.toJava, producerStateManager, new ConcurrentHashMap[String, Integer], false @@ -3318,7 +3318,7 @@ class ReplicaManagerTest { minBytes, maxBytes, isolation, - clientMetadata.asJava + clientMetadata.toJava ) replicaManager.fetchMessages( diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index c5240357779..fa67126d275 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -28,7 +28,8 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout} -import scala.compat.java8.OptionConverters._ +import scala.jdk.OptionConverters.RichOption + class SchedulerTest { @@ -154,7 +155,7 @@ class SchedulerTest { segments, 0L, 0L, - leaderEpochCache.asJava, + leaderEpochCache.toJava, producerStateManager, new ConcurrentHashMap[String, Integer], false diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 15ed5c2c17d..cdfbc1a7b69 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -88,11 +88,11 @@ import java.util.{Arrays, Collections, Optional, Properties} import scala.annotation.nowarn import scala.collection.mutable.ArrayBuffer import scala.collection.{Map, Seq, mutable} -import scala.compat.java8.OptionConverters import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Await, ExecutionContext, Future} import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.RichOption +import scala.jdk.javaapi.OptionConverters import scala.util.{Failure, Success, Try} /** diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 0afe222a086..5bf94e79cb4 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -141,7 +141,6 @@ versions += [ // has the version field as mandatory in its configuration, see // https://github.com/scalameta/scalafmt/releases/tag/v3.1.0. scalafmt: "3.7.14", - scalaJava8Compat : "1.0.2", scoverage: "2.0.11", slf4j: "1.7.36", snappy: "1.1.10.5", @@ -238,7 +237,6 @@ libs += [ protobuf: "com.google.protobuf:protobuf-java:$versions.protobuf", reload4j: "ch.qos.reload4j:reload4j:$versions.reload4j", rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB", - scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat", scalaLibrary: "org.scala-lang:scala-library:$versions.scala", scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging", scalaReflect: "org.scala-lang:scala-reflect:$versions.scala", diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index be5fae09335..6b21ccefff7 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -70,7 +70,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import scala.Option; -import scala.compat.java8.OptionConverters; +import scala.jdk.javaapi.OptionConverters; @State(Scope.Benchmark) @Fork(value = 1) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 88d7263e806..a42e725db86 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -64,7 +64,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import scala.Option; -import scala.compat.java8.OptionConverters; +import scala.jdk.javaapi.OptionConverters; @State(Scope.Benchmark) @Fork(value = 1) diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java index 8fc05ae0a94..1ec9646f00e 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java @@ -54,7 +54,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; -import scala.compat.java8.OptionConverters; +import scala.jdk.javaapi.OptionConverters; + /** * Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test invocation. Each instance of this