KAFKA-17748 Remove scala-java8-compat (#17497)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2024-10-15 13:34:21 +08:00 committed by GitHub
parent 1e5b6b8e83
commit 582bb48e88
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
60 changed files with 158 additions and 156 deletions

View File

@ -259,7 +259,6 @@ rocksdbjni-7.9.2
scala-library-2.13.15
scala-logging_2.13-3.9.5
scala-reflect-2.13.15
scala-java8-compat_2.13-1.0.2
snappy-java-1.1.10.5
swagger-annotations-2.2.8
zookeeper-3.8.4

View File

@ -1054,7 +1054,6 @@ project(':core') {
implementation libs.joptSimple
implementation libs.jose4j
implementation libs.metrics
implementation libs.scalaJava8Compat
// only needed transitively, but set it explicitly to ensure it has the same version as scala-library
implementation libs.scalaReflect
implementation libs.scalaLogging
@ -1549,7 +1548,6 @@ project(':test-common') {
}
dependencies {
implementation libs.scalaJava8Compat
implementation project(':core')
implementation project(':metadata')
implementation project(':server')
@ -1592,7 +1590,6 @@ project(':test-common:test-common-api') {
dependencies {
implementation libs.scalaJava8Compat
implementation project(':clients')
implementation project(':core')
implementation project(':group-coordinator')
@ -3300,7 +3297,6 @@ project(':jmh-benchmarks') {
implementation libs.mockitoCore
implementation libs.slf4jReload4j
implementation libs.scalaLibrary
implementation libs.scalaJava8Compat
}
tasks.withType(JavaCompile) {

View File

@ -28,7 +28,7 @@ import java.util.function.Consumer;
import java.util.function.Supplier;
import scala.Option;
import scala.compat.java8.OptionConverters;
import scala.jdk.javaapi.OptionConverters;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

View File

@ -43,7 +43,8 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import java.util.Collections;
import java.util.Optional;
import scala.compat.java8.OptionConverters;
import scala.jdk.javaapi.OptionConverters;
public class KafkaApisBuilder {

View File

@ -46,7 +46,8 @@ import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import scala.compat.java8.OptionConverters;
import scala.jdk.javaapi.OptionConverters;
public class ReplicaManagerBuilder {

View File

@ -29,8 +29,8 @@ import org.apache.kafka.metadata.{BrokerRegistration, VersionRange}
import org.apache.kafka.server.authorizer.AuthorizerServerInfo
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
object Broker {
private[kafka] case class ServerInfo(clusterResource: ClusterResource,
@ -58,7 +58,7 @@ object Broker {
new Broker(
registration.id(),
registration.listeners().values().asScala.map(EndPoint.fromJava).toSeq,
registration.rack().asScala,
registration.rack().toScala,
Features.supportedFeatures(supportedFeatures(registration.supportedFeatures()))
)
}

View File

@ -29,7 +29,7 @@ import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToContro
import org.apache.kafka.server.common.ProducerIdsBlock
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.jdk.OptionConverters.RichOptional
import scala.util.{Failure, Success, Try}
/**
@ -180,7 +180,7 @@ class RPCProducerIdManager(brokerId: Int,
var result: Try[Long] = null
var iteration = 0
while (result == null) {
currentProducerIdBlock.get.claimNextId().asScala match {
currentProducerIdBlock.get.claimNextId().toScala match {
case None =>
// Check the next block if current block is full
val block = nextProducerIdBlock.getAndSet(null)

View File

@ -35,8 +35,8 @@ import java.util.regex.Pattern
import java.util.{Collections, Optional}
import scala.collection.mutable.ListBuffer
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
/**
* An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset.
@ -415,7 +415,7 @@ class LocalLog(@volatile private var _dir: File,
val startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
val upperBoundOffset = segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).orElse(
segments.higherSegment(segment.baseOffset).asScala.map(s => s.baseOffset).getOrElse(logEndOffset))
segments.higherSegment(segment.baseOffset).toScala.map(s => s.baseOffset).getOrElse(logEndOffset))
val abortedTransactions = ListBuffer.empty[FetchResponseData.AbortedTransaction]
def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)

View File

@ -41,7 +41,7 @@ import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog => JLocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, UnifiedLog => JUnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard, LocalLog => JLocalLog, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import java.io.{File, IOException}
@ -54,8 +54,8 @@ import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import scala.annotation.nowarn
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Seq, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOption, RichOptional, RichOptionalInt}
/**
* A log which presents a unified view of local and tiered log segments.
@ -943,7 +943,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
}
def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch.asScala)
def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch.toScala)
def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
leaderEpochCache.flatMap { cache =>
@ -959,7 +959,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private def maybeIncrementFirstUnstableOffset(): Unit = lock synchronized {
localLog.checkIfMemoryMappedBufferClosed()
val updatedFirstUnstableOffset = producerStateManager.firstUnstableOffset.asScala match {
val updatedFirstUnstableOffset = producerStateManager.firstUnstableOffset.toScala match {
case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset =>
val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset)
Some(maybeConvertToOffsetMetadata(offset))
@ -1282,7 +1282,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// The first cached epoch usually corresponds to the log start offset, but we have to verify this since
// it may not be true following a message format version bump as the epoch will not be available for
// log entries written in the older format.
val earliestEpochEntry = leaderEpochCache.asJava.flatMap(_.earliestEntry())
val earliestEpochEntry = leaderEpochCache.toJava.flatMap(_.earliestEntry())
val epochOpt = if (earliestEpochEntry.isPresent && earliestEpochEntry.get().startOffset <= logStartOffset) {
Optional.of[Integer](earliestEpochEntry.get().epoch)
} else Optional.empty[Integer]()
@ -1340,7 +1340,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
val timestampAndOffsetOpt = latestTimestampSegment.log.batchesFrom(position.position).asScala
.find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
.flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _,
.flatMap(batch => batch.offsetOfMaxTimestamp().toScala.map(new TimestampAndOffset(batch.maxTimestamp(), _,
Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0))))
OffsetResultHolder(timestampAndOffsetOpt)
} else {
@ -1376,7 +1376,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// constant time access while being safe to use with concurrent collections unlike `toArray`.
val segmentsCopy = logSegments.asScala.toBuffer
val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp)
targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, startOffset).asScala)
targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, startOffset).toScala)
}
def legacyFetchOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
@ -2051,7 +2051,7 @@ object UnifiedLog extends Logging {
segments,
logStartOffset,
recoveryPoint,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
producerStateManager,
numRemainingSegments,
isRemoteLogEnabled,

View File

@ -38,7 +38,7 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.network.RequestConvertToJson
import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
import scala.jdk.OptionConverters.RichOption
import scala.reflect.ClassTag
object RequestChannel extends Logging {
@ -250,7 +250,7 @@ object RequestChannel extends Logging {
recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos))
if (isRequestLoggingEnabled) {
val desc = RequestConvertToJson.requestDescMetrics(header, requestLog.asJava, response.responseLog.asJava,
val desc = RequestConvertToJson.requestDescMetrics(header, requestLog.toJava, response.responseLog.toJava,
context, session, isForwarded,
totalTimeMs, requestQueueTimeMs, apiLocalTimeMs,
apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs,

View File

@ -49,7 +49,7 @@ import java.nio.file.{Files, NoSuchFileException, Path}
import java.util.{Optional, Properties}
import scala.annotation.nowarn
import scala.collection.mutable
import scala.compat.java8.OptionConverters._
import scala.jdk.OptionConverters.{RichOption, RichOptional}
final class KafkaMetadataLog private (
val log: UnifiedLog,
@ -134,7 +134,7 @@ final class KafkaMetadataLog private (
}
override def endOffsetForEpoch(epoch: Int): OffsetAndEpoch = {
(log.endOffsetForEpoch(epoch), earliestSnapshotId().asScala) match {
(log.endOffsetForEpoch(epoch), earliestSnapshotId().toScala) match {
case (Some(offsetAndEpoch), Some(snapshotId)) if (
offsetAndEpoch.offset == snapshotId.offset &&
offsetAndEpoch.leaderEpoch == epoch) =>
@ -176,7 +176,7 @@ final class KafkaMetadataLog private (
override def truncateToLatestSnapshot(): Boolean = {
val latestEpoch = log.latestEpoch.getOrElse(0)
val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match {
val (truncated, forgottenSnapshots) = latestSnapshotId().toScala match {
case Some(snapshotId) if (
snapshotId.epoch > latestEpoch ||
(snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)
@ -202,7 +202,7 @@ final class KafkaMetadataLog private (
override def updateHighWatermark(offsetMetadata: LogOffsetMetadata): Unit = {
// This API returns the new high watermark, which may be different from the passed offset
val logHighWatermark = offsetMetadata.metadata.asScala match {
val logHighWatermark = offsetMetadata.metadata.toScala match {
case Some(segmentPosition: SegmentPosition) =>
log.updateHighWatermark(
new internals.log.LogOffsetMetadata(
@ -317,7 +317,7 @@ final class KafkaMetadataLog private (
value
}
reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
reader.toJava.asInstanceOf[Optional[RawSnapshotReader]]
}
}
@ -329,13 +329,13 @@ final class KafkaMetadataLog private (
override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
snapshots synchronized {
snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.toJava
}
}
override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
snapshots synchronized {
snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
snapshots.headOption.map { case (snapshotId, _) => snapshotId }.toJava
}
}
@ -363,7 +363,7 @@ final class KafkaMetadataLog private (
private def deleteBeforeSnapshot(snapshotId: OffsetAndEpoch, reason: SnapshotDeletionReason): Boolean = {
val (deleted, forgottenSnapshots) = snapshots synchronized {
latestSnapshotId().asScala match {
latestSnapshotId().toScala match {
case Some(latestSnapshotId) if
snapshots.contains(snapshotId) &&
startOffset < snapshotId.offset &&
@ -387,7 +387,7 @@ final class KafkaMetadataLog private (
*/
private def loadSnapshotSizes(): Seq[(OffsetAndEpoch, Long)] = {
snapshots.keys.toSeq.flatMap {
snapshotId => readSnapshot(snapshotId).asScala.map { reader => (snapshotId, reader.sizeInBytes())}
snapshotId => readSnapshot(snapshotId).toScala.map { reader => (snapshotId, reader.sizeInBytes())}
}
}
@ -395,7 +395,7 @@ final class KafkaMetadataLog private (
* Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
*/
private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
readSnapshot(snapshotId).asScala.map { reader =>
readSnapshot(snapshotId).toScala.map { reader =>
Snapshots.lastContainedLogTimestamp(reader)
}
}

View File

@ -44,8 +44,8 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
import scala.collection.{Map, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOption, RichOptionalInt}
import scala.math._
/**
@ -266,7 +266,7 @@ abstract class AbstractFetcherThread(name: String,
case Errors.FENCED_LEADER_EPOCH =>
val currentLeaderEpoch = latestEpochsForPartitions.get(tp)
.map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).asJava
.map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).toJava
if (onPartitionFenced(tp, currentLeaderEpoch))
partitionsWithError += tp
@ -365,7 +365,7 @@ abstract class AbstractFetcherThread(name: String,
// ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
if ((validBytes > 0 || currentFetchState.lag.isEmpty) && partitionStates.contains(topicPartition)) {
val lastFetchedEpoch =
if (logAppendInfo.lastLeaderEpoch.isPresent) logAppendInfo.lastLeaderEpoch.asScala else currentFetchState.lastFetchedEpoch
if (logAppendInfo.lastLeaderEpoch.isPresent) logAppendInfo.lastLeaderEpoch.toScala else currentFetchState.lastFetchedEpoch
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
currentFetchState.currentLeaderEpoch, state = Fetching, lastFetchedEpoch)

View File

@ -36,8 +36,8 @@ import java.util
import java.util.concurrent.CompletableFuture
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
/**
* Logic to handle ACL requests.
@ -112,7 +112,7 @@ class AclApis(authHelper: AuthHelper,
val aclCreationResults = allBindings.map { acl =>
val result = errorResults.getOrElse(acl, createResults(validBindings.indexOf(acl)).get)
val creationResult = new AclCreationResult()
result.exception.asScala.foreach { throwable =>
result.exception.toScala.foreach { throwable =>
val apiError = ApiError.fromThrowable(throwable)
creationResult
.setErrorCode(apiError.error.code)

View File

@ -39,7 +39,7 @@ import org.apache.kafka.server.util.Scheduler
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.compat.java8.OptionConverters._
import scala.jdk.OptionConverters.RichOptional
/**
* Handles updating the ISR by sending AlterPartition requests to the controller (as of 2.7) or by updating ZK directly
@ -328,7 +328,7 @@ class DefaultAlterPartitionManager(
val apiError = Errors.forCode(partition.errorCode)
debug(s"Controller successfully handled AlterPartition request for $tp: $partition")
if (apiError == Errors.NONE) {
LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).asScala match {
LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).toScala match {
case Some(leaderRecoveryState) =>
partitionResponses(tp) = Right(
new LeaderAndIsr(

View File

@ -37,8 +37,8 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
trait AutoTopicCreationManager {
@ -195,7 +195,7 @@ class DefaultAutoTopicCreationManager(
val request = metadataRequestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.asScala match {
channelManager.controllerApiVersions.toScala match {
case None =>
// We will rely on the Metadata request to be retried in the case
// that the latest version is not usable by the controller.

View File

@ -64,8 +64,8 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.{Condition, ReentrantLock}
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
import scala.collection.Map
import scala.compat.java8.OptionConverters.RichOptionForJava8
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
/**
@ -576,7 +576,7 @@ class BrokerServer(
// authorizer future is completed.
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
builder.build(authorizer.asJava,
builder.build(authorizer.toJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
@ -679,7 +679,7 @@ class BrokerServer(
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava,
(tp: TopicPartition) => logManager.getLog(tp).toJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>
log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset)

View File

@ -39,8 +39,8 @@ import org.apache.kafka.storage.internals.log.LogConfig
import scala.collection.mutable.ListBuffer
import scala.collection.{Map, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepository: ConfigRepository) extends Logging {
@ -188,7 +188,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val allNames = brokerSynonyms(name)
val configEntryType = GroupConfig.configType(name).asScala
val configEntryType = GroupConfig.configType(name).toScala
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull)
val allSynonyms = {
@ -212,7 +212,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val configEntryType = LogConfig.configType(name).asScala
val configEntryType = LogConfig.configType(name).toScala
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull)
val allSynonyms = {

View File

@ -53,8 +53,8 @@ import java.util
import java.util.{Optional, OptionalLong}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
/**
@ -172,7 +172,7 @@ class ControllerServer(
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
builder.build(authorizer.asJava,
builder.build(authorizer.toJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
@ -229,8 +229,8 @@ class ControllerServer(
setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
setMaxIdleIntervalNs(maxIdleIntervalNs).
setMetrics(quorumControllerMetrics).
setCreateTopicPolicy(createTopicPolicy.asJava).
setAlterConfigPolicy(alterConfigPolicy.asJava).
setCreateTopicPolicy(createTopicPolicy.toJava).
setAlterConfigPolicy(alterConfigPolicy.toJava).
setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)).
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata).

View File

@ -26,7 +26,8 @@ import org.apache.kafka.common.requests.{EnvelopeRequest, RequestContext, Reques
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.network.metrics.RequestChannelMetrics
import scala.compat.java8.OptionConverters._
import scala.jdk.OptionConverters.RichOptional
object EnvelopeUtils {
def handleEnvelopeRequest(
@ -124,7 +125,7 @@ object EnvelopeUtils {
envelopeContext: RequestContext,
principalBytes: Array[Byte]
): KafkaPrincipal = {
envelopeContext.principalSerde.asScala match {
envelopeContext.principalSerde.toScala match {
case Some(serde) =>
try {
serde.deserialize(principalBytes)

View File

@ -28,7 +28,7 @@ import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, Enve
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import java.util.concurrent.TimeUnit
import scala.compat.java8.OptionConverters._
import scala.jdk.OptionConverters.RichOptional
trait ForwardingManager {
def close(): Unit
@ -98,7 +98,7 @@ object ForwardingManager {
private[server] def buildEnvelopeRequest(context: RequestContext,
forwardRequestBuffer: ByteBuffer): EnvelopeRequest.Builder = {
val principalSerde = context.principalSerde.asScala.getOrElse(
val principalSerde = context.principalSerde.toScala.getOrElse(
throw new IllegalArgumentException(s"Cannot deserialize principal from request context $context " +
"since there is no serde defined")
)
@ -188,7 +188,7 @@ class ForwardingManagerImpl(
forwardingManagerMetrics.close()
override def controllerApiVersions: Option[NodeApiVersions] =
channelManager.controllerApiVersions.asScala
channelManager.controllerApiVersions.toScala
private def parseResponse(
buffer: ByteBuffer,

View File

@ -54,9 +54,9 @@ import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.zookeeper.client.ZKClientConfig
import scala.annotation.nowarn
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq}
import scala.jdk.OptionConverters.RichOptional
object KafkaConfig {
@ -149,8 +149,8 @@ object KafkaConfig {
def loggableValue(resourceType: ConfigResource.Type, name: String, value: String): String = {
val maybeSensitive = resourceType match {
case ConfigResource.Type.BROKER => KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name).asScala)
case ConfigResource.Type.GROUP => KafkaConfig.maybeSensitive(GroupConfig.configType(name).asScala)
case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name).toScala)
case ConfigResource.Type.GROUP => KafkaConfig.maybeSensitive(GroupConfig.configType(name).toScala)
case ConfigResource.Type.BROKER_LOGGER => false
case ConfigResource.Type.CLIENT_METRICS => false
case _ => true

View File

@ -76,8 +76,8 @@ import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Optional, OptionalInt, OptionalLong}
import scala.collection.{Map, Seq}
import scala.compat.java8.OptionConverters.RichOptionForJava8
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
object KafkaServer {
def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false): ZKClientConfig = {
@ -702,7 +702,7 @@ class KafkaServer(
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava,
(tp: TopicPartition) => logManager.getLog(tp).toJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>
log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset)

View File

@ -34,8 +34,8 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPa
import java.util
import java.util.Optional
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters.RichOptionForJava8
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
/**
* Facilitates fetches from a local replica leader.
@ -206,7 +206,7 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
try {
val logStartOffset = replicaManager.futureLocalLogOrException(topicPartition).logStartOffset
val lastFetchedEpoch = if (isTruncationOnFetchSupported)
fetchState.lastFetchedEpoch.map(_.asInstanceOf[Integer]).asJava
fetchState.lastFetchedEpoch.map(_.asInstanceOf[Integer]).toJava
else
Optional.empty[Integer]
val topicId = fetchState.topicId.getOrElse(Uuid.ZERO_UUID)

View File

@ -38,8 +38,8 @@ import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletion
import java.util
import java.util.Optional
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOption, RichOptionalInt}
case class ControllerInformation(
node: Option[Node],
@ -115,7 +115,7 @@ class RaftControllerNodeProvider(
private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, listenerName)
override def getControllerInfo(): ControllerInformation =
ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.flatMap(idToNode),
ControllerInformation(raftManager.leaderAndEpoch.leaderId.toScala.flatMap(idToNode),
listenerName, securityProtocol, saslMechanism, isZkController = false)
}
@ -231,7 +231,7 @@ class NodeToControllerChannelManagerImpl(
def controllerApiVersions(): Optional[NodeApiVersions] = {
requestThread.activeControllerAddress().flatMap { activeController =>
Option(apiVersions.get(activeController.idString))
}.asJava
}.toJava
}
def getTimeoutMs: Long = retryTimeoutMs

View File

@ -30,12 +30,12 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetFo
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion}
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2
import scala.jdk.CollectionConverters._
import scala.collection.{Map, mutable}
import scala.compat.java8.OptionConverters.RichOptionForJava8
import scala.jdk.OptionConverters.RichOption
/**
* Facilitates fetches from a remote replica leader.
@ -187,7 +187,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
try {
val logStartOffset = replicaManager.localLogOrException(topicPartition).logStartOffset
val lastFetchedEpoch = if (isTruncationOnFetchSupported)
fetchState.lastFetchedEpoch.map(_.asInstanceOf[Integer]).asJava
fetchState.lastFetchedEpoch.map(_.asInstanceOf[Integer]).toJava
else
Optional.empty[Integer]
builder.add(topicPartition, new FetchRequest.PartitionData(

View File

@ -73,8 +73,8 @@ import java.util.concurrent.locks.Lock
import java.util.concurrent.{CompletableFuture, Future, RejectedExecutionException, TimeUnit}
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOption, RichOptional}
/*
* Result metadata of a log append operation on the log
@ -142,7 +142,7 @@ case class LogReadResult(info: FetchDataInfo,
this.highWatermark,
this.leaderLogStartOffset,
this.info.records,
this.divergingEpoch.asJava,
this.divergingEpoch.toJava,
if (this.lastStableOffset.isDefined) OptionalLong.of(this.lastStableOffset.get) else OptionalLong.empty(),
this.info.abortedTransactions,
if (this.preferredReadReplica.isDefined) OptionalInt.of(this.preferredReadReplica.get) else OptionalInt.empty(),
@ -1778,7 +1778,7 @@ class ReplicaManager(val config: KafkaConfig,
throw new InconsistentTopicIdException("Topic ID in the fetch session did not match the topic ID in the log.")
// If we are the leader, determine the preferred read-replica
val preferredReadReplica = params.clientMetadata.asScala.flatMap(
val preferredReadReplica = params.clientMetadata.toScala.flatMap(
metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))
if (preferredReadReplica.isDefined) {
@ -1813,7 +1813,7 @@ class ReplicaManager(val config: KafkaConfig,
val fetchDataInfo = checkFetchDataInfo(partition, readInfo.fetchedData)
LogReadResult(info = fetchDataInfo,
divergingEpoch = readInfo.divergingEpoch.asScala,
divergingEpoch = readInfo.divergingEpoch.toScala,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,
@ -1980,7 +1980,7 @@ class ReplicaManager(val config: KafkaConfig,
replicaInfoSet.add(leaderReplica)
val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).asScala.collect {
replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).toScala.collect {
// Even though the replica selector can return the leader, we don't want to send it out with the
// FetchResponse, so we exclude it here
case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id
@ -3008,7 +3008,7 @@ class ReplicaManager(val config: KafkaConfig,
partitionsToStartFetching.foreachEntry { (topicPartition, partition) =>
val nodeOpt = partition.leaderReplicaIdOpt
.flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
.flatMap(_.node(listenerName).asScala)
.flatMap(_.node(listenerName).toScala)
nodeOpt match {
case Some(node) =>

View File

@ -28,7 +28,8 @@ import java.net.{InetAddress, UnknownHostException}
import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta}
import org.apache.kafka.server.config.{QuotaConfigs, ZooKeeperInternals}
import scala.compat.java8.OptionConverters._
import scala.jdk.OptionConverters.RichOptionalDouble
// A strict hierarchy of entities that we support
@ -97,7 +98,7 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
}
}
quotaDelta.changes().forEach { (key, value) =>
handleUserClientQuotaChange(userClientEntity, key, value.asScala)
handleUserClientQuotaChange(userClientEntity, key, value.toScala)
}
} else {
warn(s"Ignoring unsupported quota entity $entity.")
@ -124,7 +125,7 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
warn(s"Ignoring unexpected quota key $quotaName for entity $ipEntity")
} else {
try {
connectionQuotas.updateIpConnectionRateQuota(inetAddress, quotaValue.asScala.map(_.toInt))
connectionQuotas.updateIpConnectionRateQuota(inetAddress, quotaValue.toScala.map(_.toInt))
} catch {
case t: Throwable => error(s"Failed to update IP quota $ipEntity", t)
}

View File

@ -41,8 +41,8 @@ import java.util.function.Supplier
import java.util.{Collections, Properties}
import scala.collection.mutable.ListBuffer
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
import scala.util.control.Breaks._
@ -238,7 +238,7 @@ class KRaftMetadataCache(
* @return None if broker is not alive or if the broker does not have a listener named `listenerName`.
*/
private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Option[Node] = {
Option(image.cluster().broker(id)).flatMap(_.node(listenerName.value()).asScala)
Option(image.cluster().broker(id)).flatMap(_.node(listenerName.value()).toScala)
}
// errorUnavailableEndpoints exists to support v0 MetadataResponses
@ -373,12 +373,12 @@ class KRaftMetadataCache(
override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = {
Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()).
flatMap(_.node(listenerName.value()).asScala)
flatMap(_.node(listenerName.value()).toScala)
}
override def getAliveBrokerNodes(listenerName: ListenerName): Seq[Node] = {
_currentImage.cluster().brokers().values().asScala.filterNot(_.fenced()).
flatMap(_.node(listenerName.value()).asScala).toSeq
flatMap(_.node(listenerName.value()).toScala).toSeq
}
// Does NOT include offline replica metadata
@ -478,7 +478,7 @@ class KRaftMetadataCache(
val nodes = new util.HashMap[Integer, Node]
image.cluster().brokers().values().forEach { broker =>
if (!broker.fenced()) {
broker.node(listenerName.value()).asScala.foreach { node =>
broker.node(listenerName.value()).toScala.foreach { node =>
nodes.put(broker.id(), node)
}
}

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
@ -38,7 +38,7 @@ import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.OptionConverters.RichOption
/**
* Base integration test cases for [[Admin]]. Each test case added here will be executed
@ -78,7 +78,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
val newTopics = Seq(
new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava),
new NewTopic("mytopic2", 3, 3.toShort),
new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava)
new NewTopic("mytopic3", Option.empty[Integer].toJava, Option.empty[java.lang.Short].toJava)
)
val validateResult = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true))
validateResult.all.get()

View File

@ -41,9 +41,9 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.collection.mutable
import scala.compat.java8.OptionConverters
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters
abstract class BaseProducerSendTest extends KafkaServerTestHarness {

View File

@ -13,6 +13,7 @@
package kafka.api
import kafka.security.JaasTestUtils
import java.util
import java.util.Properties
import kafka.security.authorizer.AclAuthorizer
@ -30,8 +31,8 @@ import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.compat.java8.OptionConverters
import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters
object DescribeAuthorizedOperationsTest {
val Group1 = "group1"

View File

@ -37,7 +37,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import scala.collection.mutable
import scala.collection.Seq
import scala.compat.java8.OptionConverters
import scala.jdk.javaapi.OptionConverters
/**
* A helper class for writing integration tests that involve producers, consumers, and servers

View File

@ -41,7 +41,7 @@ import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfig
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter}
import org.apache.kafka.common.requests.{DeleteRecordsRequest}
import org.apache.kafka.common.requests.DeleteRecordsRequest
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.kafka.common.utils.{Time, Utils}
@ -62,10 +62,10 @@ import org.slf4j.LoggerFactory
import java.util.AbstractMap.SimpleImmutableEntry
import scala.annotation.nowarn
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
import scala.util.{Random, Using}
/**
@ -653,7 +653,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val newTopics = Seq(
new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava),
new NewTopic("mytopic2", 3, 3.toShort),
new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava)
new NewTopic("mytopic3", Option.empty[Integer].toJava, Option.empty[java.lang.Short].toJava)
)
val createResult = client.createTopics(newTopics.asJava)
createResult.all.get()
@ -3393,7 +3393,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
(1: Integer) -> Seq[Integer](2, 0).asJava).asJava).
configs(Collections.singletonMap(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "9999999")),
new NewTopic("bar", 3, 3.toShort),
new NewTopic("baz", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava)
new NewTopic("baz", Option.empty[Integer].toJava, Option.empty[java.lang.Short].toJava)
)
val result = client.createTopics(newTopics.asJava)
result.all.get()

View File

@ -13,6 +13,7 @@
package kafka.api
import kafka.security.JaasTestUtils
import java.time.Duration
import java.util.{Collections, Properties}
import java.util.concurrent.{ExecutionException, TimeUnit}
@ -32,7 +33,8 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.compat.java8.OptionConverters
import scala.jdk.javaapi.OptionConverters
class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
private val kafkaClientSaslMechanism = "SCRAM-SHA-256"

View File

@ -37,8 +37,8 @@ import javax.security.auth.Subject
import javax.security.auth.callback._
import javax.security.auth.login.AppConfigurationEntry
import scala.collection.Seq
import scala.compat.java8.OptionConverters
import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters
object SaslPlainSslEndToEndAuthorizationTest {

View File

@ -39,9 +39,9 @@ import java.util
import java.util.Properties
import javax.security.auth.login.Configuration
import scala.collection.Seq
import scala.compat.java8.OptionConverters
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
import scala.jdk.javaapi.OptionConverters
import scala.util.Using
/*

View File

@ -41,9 +41,9 @@ import org.junit.jupiter.params.provider.ValueSource
import java.util
import java.util.Collections
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
import scala.util.{Failure, Success, Try}
@Timeout(120)
@ -583,7 +583,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
val configsOverride = Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "100000").asJava
val newTopics = Seq(
new NewTopic(topic1, 2, 3.toShort).configs(configsOverride),
new NewTopic(topic2, Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava).configs(configsOverride))
new NewTopic(topic2, Option.empty[Integer].toJava, Option.empty[java.lang.Short].toJava).configs(configsOverride))
val validateResult = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true))
validateResult.all.get()
waitForTopics(client, List(), topics)

View File

@ -39,7 +39,7 @@ import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, mutable}
import scala.compat.java8.OptionConverters
import scala.jdk.javaapi.OptionConverters
object SslAdminIntegrationTest {
@volatile var semaphore: Option[Semaphore] = None

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Java
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import java.util.Optional
import scala.compat.java8.OptionConverters
import scala.jdk.javaapi.OptionConverters
object SslEndToEndAuthorizationTest {
val superuserCn = "super-user"

View File

@ -46,8 +46,8 @@ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, T
import java.nio.file.{Files, Paths}
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
trait QuorumImplementation {
def createBroker(
@ -279,7 +279,7 @@ abstract class QuorumTestHarness extends Logging {
tearDown()
}
})
val name = testInfo.getTestMethod.asScala
val name = testInfo.getTestMethod.toScala
.map(_.toString)
.getOrElse("[unspecified]")
if (TestInfoUtils.isKRaft(testInfo)) {

View File

@ -48,9 +48,9 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.Mockito.{mock, when}
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
/**
* Verifies that slow appends to log don't block request threads processing replica fetch requests.
@ -324,7 +324,7 @@ class PartitionLockTest extends Logging {
segments,
0L,
0L,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
producerStateManager,
new ConcurrentHashMap[String, Integer],
false

View File

@ -67,8 +67,8 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
object PartitionTest {
class MockPartitionListener extends PartitionListener {
@ -148,7 +148,7 @@ object PartitionTest {
minBytes,
maxBytes,
isolation,
clientMetadata.asJava
clientMetadata.toJava
)
}
}

View File

@ -37,7 +37,7 @@ import java.lang.{Long => JLong}
import java.util
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.compat.java8.OptionConverters._
import scala.jdk.OptionConverters.RichOption
/**
* Unit tests for the log cleaning logic
@ -124,7 +124,7 @@ class LogCleanerManagerTest extends Logging {
segments,
0L,
0L,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
producerStateManager,
new ConcurrentHashMap[String, Integer],
false

View File

@ -45,8 +45,8 @@ import java.nio.file.Paths
import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
import scala.collection._
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
/**
* Unit tests for the log cleaning logic
@ -204,7 +204,7 @@ class LogCleanerTest extends Logging {
logSegments,
0L,
0L,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
producerStateManager,
new ConcurrentHashMap[String, Integer],
false

View File

@ -53,8 +53,8 @@ import java.util.{Optional, OptionalLong, Properties}
import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
import scala.collection.{Iterable, Map, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOption, RichOptional}
class LogLoaderTest {
var config: KafkaConfig = _
@ -163,7 +163,7 @@ class LogLoaderTest {
this.maxTransactionTimeoutMs, this.producerStateManagerConfig, time)
val logLoader = new LogLoader(logDir, topicPartition, config, time.scheduler, time,
logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint,
leaderEpochCache.asJava, producerStateManager, new ConcurrentHashMap[String, Integer], false)
leaderEpochCache.toJava, producerStateManager, new ConcurrentHashMap[String, Integer], false)
val offsets = logLoader.load()
val localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
@ -386,7 +386,7 @@ class LogLoaderTest {
interceptedLogSegments,
0L,
recoveryPoint,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
@ -452,7 +452,7 @@ class LogLoaderTest {
segments,
0L,
0L,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
stateManager,
new ConcurrentHashMap[String, Integer],
false
@ -564,7 +564,7 @@ class LogLoaderTest {
segments,
0L,
0L,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
stateManager,
new ConcurrentHashMap[String, Integer],
false
@ -621,7 +621,7 @@ class LogLoaderTest {
segments,
0L,
0L,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
stateManager,
new ConcurrentHashMap[String, Integer],
false
@ -677,7 +677,7 @@ class LogLoaderTest {
segments,
0L,
0L,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
stateManager,
new ConcurrentHashMap[String, Integer],
false
@ -1638,7 +1638,7 @@ class LogLoaderTest {
assertEquals(9, log.activeSegment.baseOffset)
assertEquals(9, log.logEndOffset)
for (offset <- 1 until 10) {
val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset).asScala
val snapshotFileBeforeDeletion = log.producerStateManager.snapshotFileForOffset(offset).toScala
assertTrue(snapshotFileBeforeDeletion.isDefined)
assertTrue(snapshotFileBeforeDeletion.get.file.exists)
}
@ -1693,7 +1693,7 @@ class LogLoaderTest {
.filter(snapshotFile => snapshotFile.file.exists())
.map(_.offset)
val inMemorySnapshotFiles = (1 until 5)
.flatMap(offset => log.producerStateManager.snapshotFileForOffset(offset).asScala)
.flatMap(offset => log.producerStateManager.snapshotFileForOffset(offset).toScala)
assertTrue(offsetsWithSnapshotFiles.isEmpty, s"Found offsets with producer state snapshot files: $offsetsWithSnapshotFiles while none were expected.")
assertTrue(inMemorySnapshotFiles.isEmpty, s"Found in-memory producer state snapshot files: $inMemorySnapshotFiles while none were expected.")
@ -1826,7 +1826,7 @@ class LogLoaderTest {
segments,
0L,
0L,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
stateManager,
new ConcurrentHashMap[String, Integer],
isRemoteLogEnabled

View File

@ -59,8 +59,8 @@ import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
import java.util.{Optional, OptionalLong, Properties}
import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOptional, RichOptionalInt}
class UnifiedLogTest {
var config: KafkaConfig = _
@ -646,14 +646,14 @@ class UnifiedLogTest {
baseOffset = 27)
appendAsFollower(log, records, leaderEpoch = 19)
assertEquals(Some(new EpochEntry(19, 27)),
log.leaderEpochCache.flatMap(_.latestEntry.asScala))
log.leaderEpochCache.flatMap(_.latestEntry.toScala))
assertEquals(29, log.logEndOffset)
def verifyTruncationClearsEpochCache(epoch: Int, truncationOffset: Long): Unit = {
// Simulate becoming a leader
log.maybeAssignEpochStartOffset(leaderEpoch = epoch, startOffset = log.logEndOffset)
assertEquals(Some(new EpochEntry(epoch, 29)),
log.leaderEpochCache.flatMap(_.latestEntry.asScala))
log.leaderEpochCache.flatMap(_.latestEntry.toScala))
assertEquals(29, log.logEndOffset)
// Now we become the follower and truncate to an offset greater
@ -661,7 +661,7 @@ class UnifiedLogTest {
// at the end of the log should be gone
log.truncateTo(truncationOffset)
assertEquals(Some(new EpochEntry(19, 27)),
log.leaderEpochCache.flatMap(_.latestEntry.asScala))
log.leaderEpochCache.flatMap(_.latestEntry.toScala))
assertEquals(29, log.logEndOffset)
}
@ -2563,12 +2563,12 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch.asScala))
assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch.toScala))
log.appendAsFollower(TestUtils.records(List(new SimpleRecord("foo".getBytes())),
baseOffset = 1L,
magicValue = RecordVersion.V1.value))
assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch.asScala))
assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch.toScala))
}
@nowarn("cat=deprecation")

View File

@ -50,8 +50,8 @@ import java.nio.ByteBuffer
import java.util.Collections
import java.util.concurrent.atomic.AtomicReference
import scala.collection.{Map, Seq}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
class RequestChannelTest {
private val requestChannelMetrics: RequestChannelMetrics = mock(classOf[RequestChannelMetrics])
@ -71,7 +71,7 @@ class RequestChannelTest {
val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.configs.get(resource)
assertEquals(expectedValues, toMap(loggedConfig))
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.toJava, alterConfigs.isForwarded).toString
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
}
@ -135,7 +135,7 @@ class RequestChannelTest {
val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs
assertEquals(expectedValues, toMap(loggedConfig))
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.toJava, alterConfigs.isForwarded).toString
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
}

View File

@ -33,7 +33,7 @@ import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock
import java.util.Collections
import scala.compat.java8.OptionConverters._
import scala.jdk.OptionConverters.RichOption
class RequestConvertToJsonTest {
@ -76,7 +76,7 @@ class RequestConvertToJsonTest {
expectedNode.set("requestHeader", RequestConvertToJson.requestHeaderNode(req.header))
expectedNode.set("request", req.requestLog.getOrElse(new TextNode("")))
val actualNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.asJava, req.isForwarded)
val actualNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.toJava, req.isForwarded)
assertEquals(expectedNode, actualNode)
}
@ -99,7 +99,7 @@ class RequestConvertToJsonTest {
val temporaryMemoryBytes = 8
val messageConversionsTimeMs = 9
val expectedNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.asJava, req.isForwarded).asInstanceOf[ObjectNode]
val expectedNode = RequestConvertToJson.requestDesc(req.header, req.requestLog.toJava, req.isForwarded).asInstanceOf[ObjectNode]
expectedNode.set("response", res.responseLog.getOrElse(new TextNode("")))
expectedNode.set("connection", new TextNode(req.context.connectionId))
expectedNode.set("totalTimeMs", new DoubleNode(totalTimeMs))
@ -116,7 +116,7 @@ class RequestConvertToJsonTest {
expectedNode.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes))
expectedNode.set("messageConversionsTime", new DoubleNode(messageConversionsTimeMs))
val actualNode = RequestConvertToJson.requestDescMetrics(req.header, req.requestLog.asJava, res.responseLog.asJava, req.context, req.session, req.isForwarded,
val actualNode = RequestConvertToJson.requestDescMetrics(req.header, req.requestLog.toJava, res.responseLog.toJava, req.context, req.session, req.isForwarded,
totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs,
responseSendTimeMs, temporaryMemoryBytes, messageConversionsTimeMs).asInstanceOf[ObjectNode]

View File

@ -31,14 +31,14 @@ import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
@Tag("integration")
abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = {
val socket = if (cluster.controllerListenerName().asScala.contains(listenerName)) {
val socket = if (cluster.controllerListenerName().toScala.contains(listenerName)) {
cluster.controllerSocketServers().asScala.head
} else {
cluster.brokerSocketServers().asScala.head
@ -89,7 +89,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
ApiKeys.apisForListener(ApiMessageType.ListenerType.ZK_BROKER),
enableUnstableLastVersion
)
} else if (cluster.controllerListenerName().asScala.contains(listenerName)) {
} else if (cluster.controllerListenerName().toScala.contains(listenerName)) {
ApiVersionsResponse.collectApis(
ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER),
enableUnstableLastVersion
@ -109,7 +109,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
val defaultApiVersionsResponse = if (!cluster.isKRaftTest) {
TestUtils.defaultApiVersionsResponse(0, ListenerType.ZK_BROKER, enableUnstableLastVersion)
} else if (cluster.controllerListenerName().asScala.contains(listenerName)) {
} else if (cluster.controllerListenerName().toScala.contains(listenerName)) {
TestUtils.defaultApiVersionsResponse(0, ListenerType.CONTROLLER, enableUnstableLastVersion)
} else {
TestUtils.createApiVersionsResponse(0, expectedApis)

View File

@ -26,8 +26,8 @@ import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.compat.java8.OptionConverters
import scala.concurrent.ExecutionException
import scala.jdk.javaapi.OptionConverters
class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
var adminClient: Admin = _

View File

@ -30,9 +30,9 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util
import scala.compat.java8.OptionConverters
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters
class DelegationTokenRequestsTest extends IntegrationTestHarness with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT

View File

@ -27,8 +27,8 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util
import scala.compat.java8.OptionConverters
import scala.concurrent.ExecutionException
import scala.jdk.javaapi.OptionConverters
class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT

View File

@ -32,8 +32,8 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import java.nio.ByteBuffer
import java.util.Optional
import scala.collection.{Map, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOption, RichOptional}
import scala.util.Random
class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, host = "localhost", port = Random.nextInt()),
@ -154,7 +154,7 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, ho
if (state.isReadyForFetch) {
val replicaState = replicaPartitionStateCallback(partition).getOrElse(throw new IllegalArgumentException(s"Unknown partition $partition"))
val lastFetchedEpoch = if (isTruncationOnFetchSupported)
state.lastFetchedEpoch.map(_.asInstanceOf[Integer]).asJava
state.lastFetchedEpoch.map(_.asInstanceOf[Integer]).toJava
else
Optional.empty[Integer]
fetchData.put(partition,
@ -204,7 +204,7 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, ho
lastFetchedEpoch: Optional[Integer],
fetchOffset: Long,
partitionState: PartitionState): Option[FetchResponseData.EpochEndOffset] = {
lastFetchedEpoch.asScala.flatMap { fetchEpoch =>
lastFetchedEpoch.toScala.flatMap { fetchEpoch =>
val epochEndOffset = fetchEpochEndOffsets(
Map(topicPartition -> new EpochData()
.setPartition(topicPartition.partition)

View File

@ -90,8 +90,8 @@ import java.util.concurrent.{Callable, ConcurrentHashMap, CountDownLatch, TimeUn
import java.util.stream.IntStream
import java.util.{Collections, Optional, OptionalLong, Properties}
import scala.collection.{Map, Seq, mutable}
import scala.compat.java8.OptionConverters.RichOptionForJava8
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
object ReplicaManagerTest {
@AfterAll
@ -2938,7 +2938,7 @@ class ReplicaManagerTest {
segments,
0L,
0L,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
producerStateManager,
new ConcurrentHashMap[String, Integer],
false
@ -3318,7 +3318,7 @@ class ReplicaManagerTest {
minBytes,
maxBytes,
isolation,
clientMetadata.asJava
clientMetadata.toJava
)
replicaManager.fetchMessages(

View File

@ -28,7 +28,8 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
import scala.compat.java8.OptionConverters._
import scala.jdk.OptionConverters.RichOption
class SchedulerTest {
@ -154,7 +155,7 @@ class SchedulerTest {
segments,
0L,
0L,
leaderEpochCache.asJava,
leaderEpochCache.toJava,
producerStateManager,
new ConcurrentHashMap[String, Integer],
false

View File

@ -88,11 +88,11 @@ import java.util.{Arrays, Collections, Optional, Properties}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, mutable}
import scala.compat.java8.OptionConverters
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
import scala.jdk.javaapi.OptionConverters
import scala.util.{Failure, Success, Try}
/**

View File

@ -141,7 +141,6 @@ versions += [
// has the version field as mandatory in its configuration, see
// https://github.com/scalameta/scalafmt/releases/tag/v3.1.0.
scalafmt: "3.7.14",
scalaJava8Compat : "1.0.2",
scoverage: "2.0.11",
slf4j: "1.7.36",
snappy: "1.1.10.5",
@ -238,7 +237,6 @@ libs += [
protobuf: "com.google.protobuf:protobuf-java:$versions.protobuf",
reload4j: "ch.qos.reload4j:reload4j:$versions.reload4j",
rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB",
scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat",
scalaLibrary: "org.scala-lang:scala-library:$versions.scala",
scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging",
scalaReflect: "org.scala-lang:scala-reflect:$versions.scala",

View File

@ -70,7 +70,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.compat.java8.OptionConverters;
import scala.jdk.javaapi.OptionConverters;
@State(Scope.Benchmark)
@Fork(value = 1)

View File

@ -64,7 +64,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.compat.java8.OptionConverters;
import scala.jdk.javaapi.OptionConverters;
@State(Scope.Benchmark)
@Fork(value = 1)

View File

@ -54,7 +54,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import scala.compat.java8.OptionConverters;
import scala.jdk.javaapi.OptionConverters;
/**
* Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test invocation. Each instance of this