MINOR: Fix Scala 2.13 compiler warnings (#8390)

Once Scala 2.13.2 is officially released, I will submit a follow up PR
that enables `-Xfatal-warnings` with the necessary warning
exclusions. Compiler warning exclusions were only introduced in 2.13.2
and hence why we have to wait for that. I used a snapshot build to
test it in the meantime.

Changes:
* Remove Deprecated annotation from internal request classes
* Class.newInstance is deprecated in favor of
Class.getConstructor().newInstance
* Replace deprecated JavaConversions with CollectionConverters
* Remove unused kafka.cluster.Cluster
* Don't use Map and Set methods deprecated in 2.13:
    - collection.Map +, ++, -, --, mapValues, filterKeys, retain
    - collection.Set +, ++, -, --
* Add scala-collection-compat dependency to streams-scala and
update version to 2.1.4.
* Replace usages of deprecated Either.get and Either.right
* Replace usage of deprecated Integer(String) constructor
* `import scala.language.implicitConversions` is not needed in Scala 2.13
* Replace usage of deprecated `toIterator`, `Traversable`, `seq`,
`reverseMap`, `hasDefiniteSize`
* Replace usage of deprecated alterConfigs with incrementalAlterConfigs
where possible
* Fix implicit widening conversions from Long/Int to Double/Float
* Avoid implicit conversions to String
* Eliminate usage of deprecated procedure syntax
* Remove `println`in `LogValidatorTest` instead of fixing the compiler
warning since tests should not `println`.
* Eliminate implicit conversion from Array to Seq
* Remove unnecessary usage of 3 argument assertEquals
* Replace `toStream` with `iterator`
* Do not use deprecated SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS
* Replace StringBuilder.newBuilder with new StringBuilder
* Rename AclBuffers to AclSeqs and remove usage of `filterKeys`
* More consistent usage of Set/Map in Controller classes: this also fixes
deprecated warnings with Scala 2.13
* Add spotBugs exclusion for inliner artifact in KafkaApis with Scala 2.12.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ismael Juma 2020-04-01 06:20:48 -07:00 committed by GitHub
parent cb9125106d
commit 90bbeedf52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
259 changed files with 779 additions and 814 deletions

View File

@ -1302,6 +1302,7 @@ project(':streams:streams-scala') {
compile project(':streams')
compile libs.scalaLibrary
compile libs.scalaCollectionCompat
testCompile project(':core')
testCompile project(':core').sourceSets.test.output

View File

@ -186,7 +186,6 @@ public class ListOffsetRequest extends AbstractRequest {
public static final class PartitionData {
public final long timestamp;
@Deprecated
public final int maxNumOffsets; // only supported in v0
public final Optional<Integer> currentLeaderEpoch;
@ -196,7 +195,7 @@ public class ListOffsetRequest extends AbstractRequest {
this.currentLeaderEpoch = currentLeaderEpoch;
}
@Deprecated
// For V0
public PartitionData(long timestamp, int maxNumOffsets) {
this(timestamp, maxNumOffsets, Optional.empty());
}

View File

@ -137,7 +137,6 @@ public class ListOffsetResponse extends AbstractResponse {
public static final class PartitionData {
public final Errors error;
// The offsets list is only used in ListOffsetResponse v0.
@Deprecated
public final List<Long> offsets;
public final Long timestamp;
public final Long offset;
@ -146,7 +145,6 @@ public class ListOffsetResponse extends AbstractResponse {
/**
* Constructor for ListOffsetResponse v0
*/
@Deprecated
public PartitionData(Errors error, List<Long> offsets) {
this.error = error;
this.offsets = offsets;

View File

@ -39,9 +39,7 @@ public class OffsetCommitRequest extends AbstractRequest {
public static final String DEFAULT_MEMBER_ID = "";
public static final long DEFAULT_RETENTION_TIME = -1L;
// default values for old versions,
// will be removed after these versions are deprecated
@Deprecated
// default values for old versions, will be removed after these versions are no longer supported
public static final long DEFAULT_TIMESTAMP = -1L; // for V0, V1
private final OffsetCommitRequestData data;

View File

@ -82,7 +82,8 @@ public class SecurityUtils {
try {
String[] securityProviderClasses = securityProviderClassesStr.replaceAll("\\s+", "").split(",");
for (int index = 0; index < securityProviderClasses.length; index++) {
SecurityProviderCreator securityProviderCreator = (SecurityProviderCreator) Class.forName(securityProviderClasses[index]).newInstance();
SecurityProviderCreator securityProviderCreator =
(SecurityProviderCreator) Class.forName(securityProviderClasses[index]).getConstructor().newInstance();
securityProviderCreator.configure(configs);
Security.insertProviderAt(securityProviderCreator.getProvider(), index + 1);
}
@ -91,7 +92,7 @@ public class SecurityUtils {
" are expected to be sub-classes of SecurityProviderCreator");
} catch (ClassNotFoundException cnfe) {
LOGGER.error("Unrecognized security provider creator class", cnfe);
} catch (IllegalAccessException | InstantiationException e) {
} catch (ReflectiveOperationException e) {
LOGGER.error("Unexpected implementation of security provider creator class", e);
}
}

View File

@ -648,7 +648,7 @@ public final class MessageTest {
message.write(byteBufferAccessor, cache, version);
assertEquals("The result of the size function does not match the number of bytes " +
"written for version " + version, size, buf.position());
Message message2 = message.getClass().newInstance();
Message message2 = message.getClass().getConstructor().newInstance();
buf.flip();
message2.read(byteBufferAccessor, version);
assertEquals("The result of the size function does not match the number of bytes " +
@ -661,7 +661,7 @@ public final class MessageTest {
private void testStructRoundTrip(short version, Message message, Message expected) throws Exception {
Struct struct = message.toStruct(version);
Message message2 = message.getClass().newInstance();
Message message2 = message.getClass().getConstructor().newInstance();
message2.fromStruct(struct, version);
assertEquals(expected, message2);
assertEquals(expected.hashCode(), message2.hashCode());

View File

@ -331,7 +331,7 @@ public class ConnectorConfig extends AbstractConfig {
}
Transformation transformation;
try {
transformation = transformationCls.asSubclass(Transformation.class).newInstance();
transformation = transformationCls.asSubclass(Transformation.class).getConstructor().newInstance();
} catch (Exception e) {
throw new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
}

View File

@ -524,8 +524,8 @@ public class AbstractHerderTest {
EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
final Connector connector;
try {
connector = connectorClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
connector = connectorClass.getConstructor().newInstance();
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Couldn't create connector", e);
}
EasyMock.expect(plugins.newConnector(connectorClass.getName())).andReturn(connector);

View File

@ -479,7 +479,7 @@ public class ConnectorPluginsResourceTest {
public MockConnectorPluginDesc(Class<? extends Connector> klass) throws Exception {
super(
klass,
klass.newInstance().version(),
klass.getConstructor().newInstance().version(),
new MockPluginClassLoader(null, new URL[0])
);
}

View File

@ -25,7 +25,7 @@ import kafka.server.{KafkaServer, KafkaServerStartable}
import kafka.utils.{CommandLineUtils, Exit, Logging}
import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Utils}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
object Kafka extends Logging {

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
import org.apache.kafka.server.authorizer.Authorizer
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
import scala.collection.mutable
import scala.io.StdIn
@ -147,8 +147,9 @@ object AclCommand extends Logging {
} else {
listPrincipals.foreach(principal => {
println(s"ACLs for principal `$principal`")
val filteredResourceToAcls = resourceToAcls.mapValues(acls =>
acls.filter(acl => principal.toString.equals(acl.principal))).filter(entry => entry._2.nonEmpty)
val filteredResourceToAcls = resourceToAcls.map { case (resource, acls) =>
resource -> acls.filter(acl => principal.toString.equals(acl.principal))
}.filter { case (_, acls) => acls.nonEmpty }
for ((resource, acls) <- filteredResourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
@ -263,8 +264,9 @@ object AclCommand extends Logging {
} else {
listPrincipals.foreach(principal => {
println(s"ACLs for principal `$principal`")
val filteredResourceToAcls = resourceToAcls.mapValues(acls =>
acls.filter(acl => principal.toString.equals(acl.principal))).filter(entry => entry._2.nonEmpty)
val filteredResourceToAcls = resourceToAcls.map { case (resource, acls) =>
resource -> acls.filter(acl => principal.toString.equals(acl.principal))
}.filter { case (_, acls) => acls.nonEmpty }
for ((resource, acls) <- filteredResourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")

View File

@ -42,7 +42,7 @@ import org.apache.kafka.common.Node
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
/**

View File

@ -39,10 +39,9 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.zookeeper.client.ZKClientConfig
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection._
/**
* This script can be used to change configs for topics/clients/users/brokers dynamically
* An entity described or altered by the command may be one of:

View File

@ -31,9 +31,9 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.collection.{Map, Seq, immutable, mutable}
import scala.util.{Failure, Success, Try}
import joptsimple.OptionSpec
import org.apache.kafka.common.protocol.Errors
@ -403,7 +403,7 @@ object ConsumerGroupCommand extends Logging {
}
def deleteOffsets(groupId: String, topics: List[String]): (Errors, Map[TopicPartition, Throwable]) = {
var partitionLevelResult: Map[TopicPartition, Throwable] = mutable.HashMap()
val partitionLevelResult = mutable.Map[TopicPartition, Throwable]()
val (topicWithPartitions, topicWithoutPartitions) = topics.partition(_.contains(":"))
@ -527,8 +527,9 @@ object ConsumerGroupCommand extends Logging {
partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
Some(s"${consumerSummary.clientId}"))
}
val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
case (topicPartition, offset) =>
val rowsWithoutConsumer = committedOffsets.filter { case (tp, _) =>
!assignedTopicPartitions.contains(tp)
}.flatMap { case (topicPartition, offset) =>
collectConsumerAssignment(
groupId,
Option(consumerGroup.coordinator),
@ -862,10 +863,10 @@ object ConsumerGroupCommand extends Logging {
withTimeoutMs(new DeleteConsumerGroupsOptions)
).deletedGroups().asScala
val result = groupsToDelete.mapValues { f =>
val result = groupsToDelete.map { case (g, f) =>
Try(f.get) match {
case _: Success[_] => null
case Failure(e) => e
case Success(_) => g -> null
case Failure(e) => g -> e
}
}
@ -1009,11 +1010,11 @@ object ConsumerGroupCommand extends Logging {
options = parser.parse(args : _*)
val allGroupSelectionScopeOpts: Set[OptionSpec[_]] = Set(groupOpt, allGroupsOpt)
val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt,
val allGroupSelectionScopeOpts = immutable.Set[OptionSpec[_]](groupOpt, allGroupsOpt)
val allConsumerGroupLevelOpts = immutable.Set[OptionSpec[_]](listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
val allResetOffsetScenarioOpts = immutable.Set[OptionSpec[_]](resetToOffsetOpt, resetShiftByOpt,
resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt)
val allDeleteOffsetsOpts: Set[OptionSpec[_]] = Set(groupOpt, topicOpt)
val allDeleteOffsetsOpts = immutable.Set[OptionSpec[_]](groupOpt, topicOpt)
def checkArgs(): Unit = {

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.Set
/**

View File

@ -28,7 +28,7 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.Seq
/**

View File

@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.errors.ElectionNotNeededException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.concurrent.duration._

View File

@ -25,7 +25,7 @@ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirs
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.Map
/**
@ -48,7 +48,7 @@ object LogDirsCommand {
out.println("Querying brokers for log directories information")
val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.mapValues(_.asScala).toMap
val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.map { case (k, v) => k -> v.asScala }
out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))

View File

@ -16,7 +16,7 @@
*/
package kafka.admin
import collection.JavaConverters._
import scala.jdk.CollectionConverters._
import collection._
import java.util.Properties
import java.util.concurrent.ExecutionException

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition, TopicPartitionReplica}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, mutable}
import scala.compat.java8.OptionConverters._
import scala.math.Ordered.orderingToOrdered
@ -1062,8 +1062,7 @@ object ReassignPartitionsCommand extends Logging {
def curReassignmentsToString(adminClient: Admin): String = {
val currentReassignments = adminClient.
listPartitionReassignments().reassignments().get().asScala
val text = currentReassignments.keySet.toBuffer.sortWith(compareTopicPartitions).map {
case part =>
val text = currentReassignments.keySet.toBuffer.sortWith(compareTopicPartitions).map { part =>
val reassignment = currentReassignments(part)
val replicas = reassignment.replicas().asScala
val addingReplicas = reassignment.addingReplicas().asScala
@ -1155,7 +1154,7 @@ object ReassignPartitionsCommand extends Logging {
def currentPartitionReplicaAssignmentToString(proposedParts: Map[TopicPartition, Seq[Int]],
currentParts: Map[TopicPartition, Seq[Int]]): String = {
"Current partition replica assignment%n%n%s%n%nSave this to use as the %s".
format(formatAsReassignmentJson(currentParts.filterKeys(proposedParts.contains(_)).toMap, Map.empty),
format(formatAsReassignmentJson(currentParts.filter { case (k, _) => proposedParts.contains(k) }.toMap, Map.empty),
"--reassignment-json-file option during rollback")
}
@ -1192,13 +1191,10 @@ object ReassignPartitionsCommand extends Logging {
* @return A map from partition objects to error strings.
*/
def alterPartitionReassignments(adminClient: Admin,
reassignments: Map[TopicPartition, Seq[Int]])
: Map[TopicPartition, Throwable] = {
reassignments: Map[TopicPartition, Seq[Int]]): Map[TopicPartition, Throwable] = {
val results: Map[TopicPartition, KafkaFuture[Void]] =
adminClient.alterPartitionReassignments(reassignments.map {
case (part, replicas) => {
adminClient.alterPartitionReassignments(reassignments.map { case (part, replicas) =>
(part, Optional.of(new NewPartitionReassignment(replicas.map(Integer.valueOf(_)).asJava)))
}
}.asJava).values().asScala
results.flatMap {
case (part, future) => {

View File

@ -38,7 +38,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.zookeeper.KeeperException.NodeExistsException
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection._
import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionException
@ -653,7 +653,7 @@ object TopicCommand extends Logging {
options = parser.parse(args : _*)
private val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
private val allTopicLevelOpts = immutable.Set[OptionSpec[_]](alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
private val allReplicationReportOpts: Set[OptionSpec[_]] = Set(reportUnderReplicatedPartitionsOpt, reportUnderMinIsrPartitionsOpt, reportAtMinIsrPartitionsOpt, reportUnavailablePartitionsOpt)

View File

@ -30,7 +30,7 @@ import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.data.Stat
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.Queue
import scala.concurrent._
import scala.concurrent.duration._

View File

@ -19,7 +19,7 @@ package kafka
import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.ElectLeadersRequest
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
package object api {
implicit final class ElectLeadersRequestOps(val self: ElectLeadersRequest) extends AnyVal {

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.authorizer.AuthorizerServerInfo
import scala.collection.Seq
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
object Broker {
private[cluster] case class ServerInfo(clusterResource: ClusterResource,

View File

@ -1,45 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.cluster
import scala.collection._
/**
* The set of active brokers in the cluster
*/
private[kafka] class Cluster {
private val brokers = new mutable.HashMap[Int, Broker]
def this(brokerList: Iterable[Broker]) {
this()
for(broker <- brokerList)
brokers.put(broker.id, broker)
}
def getBroker(id: Int): Option[Broker] = brokers.get(id)
def add(broker: Broker) = brokers.put(broker.id, broker)
def remove(id: Int) = brokers.remove(id)
def size = brokers.size
override def toString: String =
"Cluster(" + brokers.values.mkString(", ") + ")"
}

View File

@ -40,7 +40,7 @@ import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq}
trait PartitionStateStore {
@ -662,13 +662,11 @@ class Partition(val topicPartition: TopicPartition,
isr: Set[Int],
addingReplicas: Seq[Int],
removingReplicas: Seq[Int]): Unit = {
val replicaSet = assignment.toSet
val removedReplicas = remoteReplicasMap.keys -- replicaSet
remoteReplicasMap.clear()
assignment
.filter(_ != localBrokerId)
.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))
removedReplicas.foreach(remoteReplicasMap.remove)
if (addingReplicas.nonEmpty || removingReplicas.nonEmpty)
assignmentState = OngoingReassignmentState(addingReplicas, removingReplicas, assignment)
else

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
/**
* Class for inter-broker send thread that utilize a non-blocking network client.

View File

@ -37,7 +37,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicPartition}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.collection.{Seq, Set, mutable}
@ -445,8 +445,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
else 0
leaderAndIsrRequestMap.filterKeys(controllerContext.liveOrShuttingDownBrokerIds.contains).foreach {
case (broker, leaderAndIsrPartitionStates) =>
leaderAndIsrRequestMap.foreach { case (broker, leaderAndIsrPartitionStates) =>
if (controllerContext.liveOrShuttingDownBrokerIds.contains(broker)) {
val numBecomeLeaders = leaderAndIsrPartitionStates.count { case (topicPartition, state) =>
val isBecomeLeader = broker == state.leader
val typeOfRequest =
@ -470,8 +470,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
sendEvent(LeaderAndIsrResponseReceived(leaderAndIsrResponse, broker))
})
}
}
leaderAndIsrRequestMap.clear()
}
@ -553,7 +552,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
}
val traceEnabled = stateChangeLog.isTraceEnabled
stopReplicaRequestMap.filterKeys(controllerContext.liveOrShuttingDownBrokerIds.contains).foreach { case (brokerId, replicaInfoList) =>
stopReplicaRequestMap.foreach { case (brokerId, replicaInfoList) =>
if (controllerContext.liveOrShuttingDownBrokerIds.contains(brokerId)) {
val (stopReplicaWithDelete, stopReplicaWithoutDelete) = replicaInfoList.partition(r => r.deletePartition)
val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(brokerId)
@ -574,6 +574,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
sendRequest(brokerId, stopReplicaRequest)
}
}
}
stopReplicaRequestMap.clear()
}

View File

@ -74,19 +74,19 @@ case class ReplicaAssignment private (replicas: Seq[Int],
class ControllerContext {
val stats = new ControllerStats
var offlinePartitionCount = 0
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
private var liveBrokers: Set[Broker] = Set.empty
private var liveBrokerEpochs: Map[Int, Long] = Map.empty
val shuttingDownBrokerIds = mutable.Set.empty[Int]
private val liveBrokers = mutable.Set.empty[Broker]
private val liveBrokerEpochs = mutable.Map.empty[Int, Long]
var epoch: Int = KafkaController.InitialControllerEpoch
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
var allTopics: Set[String] = Set.empty
val allTopics = mutable.Set.empty[String]
val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, ReplicaAssignment]]
val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
val partitionsBeingReassigned = mutable.Set.empty[TopicPartition]
val partitionStates = mutable.Map.empty[TopicPartition, PartitionState]
val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]
val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty
val replicasOnOfflineDirs = mutable.Map.empty[Int, Set[TopicPartition]]
val topicsToBeDeleted = mutable.Set.empty[String]
@ -113,7 +113,7 @@ class ControllerContext {
val topicsIneligibleForDeletion = mutable.Set.empty[String]
private def clearTopicsState(): Unit = {
allTopics = Set.empty
allTopics.clear()
partitionAssignments.clear()
partitionLeadershipInfo.clear()
partitionsBeingReassigned.clear()
@ -124,8 +124,7 @@ class ControllerContext {
}
def partitionReplicaAssignment(topicPartition: TopicPartition): Seq[Int] = {
partitionAssignments.getOrElse(topicPartition.topic, mutable.Map.empty)
.get(topicPartition.partition) match {
partitionAssignments.getOrElse(topicPartition.topic, mutable.Map.empty).get(topicPartition.partition) match {
case Some(partitionAssignment) => partitionAssignment.replicas
case None => Seq.empty
}
@ -161,21 +160,24 @@ class ControllerContext {
}.toSet
}
def setLiveBrokerAndEpochs(brokerAndEpochs: Map[Broker, Long]): Unit = {
liveBrokers = brokerAndEpochs.keySet
liveBrokerEpochs =
brokerAndEpochs map { case (broker, brokerEpoch) => (broker.id, brokerEpoch)}
def setLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {
clearLiveBrokers()
addLiveBrokers(brokerAndEpochs)
}
def addLiveBrokersAndEpochs(brokerAndEpochs: Map[Broker, Long]): Unit = {
liveBrokers = liveBrokers ++ brokerAndEpochs.keySet
liveBrokerEpochs = liveBrokerEpochs ++
(brokerAndEpochs map { case (broker, brokerEpoch) => (broker.id, brokerEpoch)})
private def clearLiveBrokers(): Unit = {
liveBrokers.clear()
liveBrokerEpochs.clear()
}
def addLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {
liveBrokers ++= brokerAndEpochs.keySet
liveBrokerEpochs ++= brokerAndEpochs.map { case (broker, brokerEpoch) => (broker.id, brokerEpoch) }
}
def removeLiveBrokers(brokerIds: Set[Int]): Unit = {
liveBrokers = liveBrokers.filter(broker => !brokerIds.contains(broker.id))
liveBrokerEpochs = liveBrokerEpochs.filter { case (id, _) => !brokerIds.contains(id) }
liveBrokers --= liveBrokers.filter(broker => brokerIds.contains(broker.id))
liveBrokerEpochs --= brokerIds
}
def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {
@ -184,7 +186,7 @@ class ControllerContext {
}
// getter
def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet -- shuttingDownBrokerIds
def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet.diff(shuttingDownBrokerIds)
def liveOrShuttingDownBrokerIds: Set[Int] = liveBrokerEpochs.keySet
def liveOrShuttingDownBrokers: Set[Broker] = liveBrokers
def liveBrokerIdAndEpochs: Map[Int, Long] = liveBrokerEpochs
@ -270,15 +272,20 @@ class ControllerContext {
epoch = 0
epochZkVersion = 0
clearTopicsState()
setLiveBrokerAndEpochs(Map.empty)
clearLiveBrokers()
}
def setAllTopics(topics: Set[String]): Unit = {
allTopics.clear()
allTopics ++= topics
}
def removeTopic(topic: String): Unit = {
allTopics -= topic
partitionAssignments.remove(topic)
partitionLeadershipInfo.foreach {
case (topicPartition, _) if topicPartition.topic == topic => partitionLeadershipInfo.remove(topicPartition)
case _ =>
partitionLeadershipInfo.foreach { case (topicPartition, _) =>
if (topicPartition.topic == topic)
partitionLeadershipInfo.remove(topicPartition)
}
}

View File

@ -27,7 +27,7 @@ import kafka.utils.ShutdownableThread
import org.apache.kafka.common.utils.Time
import scala.collection._
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
object ControllerEventManager {
val ControllerEventThreadName = "controller-event-thread"

View File

@ -39,7 +39,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Try}
@ -348,7 +348,7 @@ class KafkaController(val config: KafkaConfig,
info(s"New broker startup callback for ${newBrokers.mkString(",")}")
newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
val newBrokersSet = newBrokers.toSet
val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds -- newBrokers
val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds.diff(newBrokersSet)
// Send update metadata request to all the existing brokers in the cluster so that they know about the new brokers
// via this update. No need to include any partition states in the request since there are no partition state changes.
sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
@ -725,9 +725,9 @@ class KafkaController(val config: KafkaConfig,
private def initializeControllerContext(): Unit = {
// update controller cache with delete topic information
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
controllerContext.setLiveBrokerAndEpochs(curBrokerAndEpochs)
controllerContext.setLiveBrokers(curBrokerAndEpochs)
info(s"Initialized broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
controllerContext.allTopics = zkClient.getAllTopicsInCluster(true)
controllerContext.setAllTopics(zkClient.getAllTopicsInCluster(true))
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
zkClient.getFullReplicaAssignmentForTopics(controllerContext.allTopics.toSet).foreach {
case (topicPartition, replicaAssignment) =>
@ -736,7 +736,7 @@ class KafkaController(val config: KafkaConfig,
controllerContext.partitionsBeingReassigned.add(topicPartition)
}
controllerContext.partitionLeadershipInfo.clear()
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
controllerContext.shuttingDownBrokerIds.clear()
// register broker modifications handlers
registerBrokerModificationsHandler(controllerContext.liveOrShuttingDownBrokerIds)
// update the leader and isr cache for all existing partitions from Zookeeper
@ -848,8 +848,9 @@ class KafkaController(val config: KafkaConfig,
}
private def updateReplicaAssignmentForPartition(topicPartition: TopicPartition, assignment: ReplicaAssignment): Unit = {
var topicAssignment = controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic)
topicAssignment += topicPartition -> assignment
val topicAssignment = mutable.Map() ++=
controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic) +=
(topicPartition -> assignment)
val setDataResponse = zkClient.setTopicAssignmentRaw(topicPartition.topic, topicAssignment, controllerContext.epochZkVersion)
setDataResponse.resultCode match {
@ -1194,9 +1195,9 @@ class KafkaController(val config: KafkaConfig,
}
val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicPartition])
val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
val currentOfflineReplicas = mutable.Set() ++= previousOfflineReplicas --= onlineReplicas ++= offlineReplicas
controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
val newOfflineReplicas = currentOfflineReplicas.diff(previousOfflineReplicas)
if (newOfflineReplicas.nonEmpty) {
stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
@ -1221,7 +1222,7 @@ class KafkaController(val config: KafkaConfig,
topicDeletionManager.failReplicaDeletion(replicasInError)
if (replicasInError.size != partitionErrors.size) {
// some replicas could have been successfully deleted
val deletedReplicas = partitionErrors.keySet -- partitionsInError
val deletedReplicas = partitionErrors.keySet.diff(partitionsInError)
topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(PartitionAndReplica(_, replicaId)))
}
}
@ -1364,8 +1365,8 @@ class KafkaController(val config: KafkaConfig,
val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) }
val curBrokerIds = curBrokerIdAndEpochs.keySet
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
val newBrokerIds = curBrokerIds.diff(liveOrShuttingDownBrokerIds)
val deadBrokerIds = liveOrShuttingDownBrokerIds.diff(curBrokerIds)
val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
.filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
val newBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => newBrokerIds.contains(broker.id) }
@ -1384,13 +1385,13 @@ class KafkaController(val config: KafkaConfig,
bouncedBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
deadBrokerIds.foreach(controllerChannelManager.removeBroker)
if (newBrokerIds.nonEmpty) {
controllerContext.addLiveBrokersAndEpochs(newBrokerAndEpochs)
controllerContext.addLiveBrokers(newBrokerAndEpochs)
onBrokerStartup(newBrokerIdsSorted)
}
if (bouncedBrokerIds.nonEmpty) {
controllerContext.removeLiveBrokers(bouncedBrokerIds)
onBrokerFailure(bouncedBrokerIdsSorted)
controllerContext.addLiveBrokersAndEpochs(bouncedBrokerAndEpochs)
controllerContext.addLiveBrokers(bouncedBrokerAndEpochs)
onBrokerStartup(bouncedBrokerIdsSorted)
}
if (deadBrokerIds.nonEmpty) {
@ -1422,8 +1423,8 @@ class KafkaController(val config: KafkaConfig,
if (!isActive) return
val topics = zkClient.getAllTopicsInCluster(true)
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
val deletedTopics = controllerContext.allTopics.diff(topics)
controllerContext.setAllTopics(topics)
registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)

View File

@ -434,7 +434,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
finishedUpdates.foreach { case (partition, result) =>
result.right.foreach { leaderAndIsr =>
result.foreach { leaderAndIsr =>
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)

View File

@ -332,14 +332,14 @@ class ZkReplicaStateMachine(config: KafkaConfig,
): (Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]], Seq[TopicPartition]) = {
val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk) = getTopicPartitionStatesFromZk(partitions)
val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (_, result) =>
result.right.map { leaderAndIsr =>
result.map { leaderAndIsr =>
leaderAndIsr.isr.contains(replicaId)
}.right.getOrElse(false)
}.getOrElse(false)
}
val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] = leaderAndIsrsWithReplica.flatMap {
case (partition, result) =>
result.right.toOption.map { leaderAndIsr =>
result.toOption.map { leaderAndIsr =>
val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
@ -347,10 +347,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
}
val UpdateLeaderAndIsrResult(finishedPartitions, updatesToRetry) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs,
controllerContext.epoch,
controllerContext.epochZkVersion
)
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
val exceptionsForPartitionsWithNoLeaderAndIsrInZk: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
partitionsWithNoLeaderAndIsrInZk.iterator.flatMap { partition =>
@ -364,21 +361,15 @@ class ZkReplicaStateMachine(config: KafkaConfig,
}.toMap
val leaderIsrAndControllerEpochs: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
(leaderAndIsrsWithoutReplica ++ finishedPartitions).map { case (partition, result: Either[Exception, LeaderAndIsr]) =>
(
partition,
result.right.map { leaderAndIsr =>
(leaderAndIsrsWithoutReplica ++ finishedPartitions).map { case (partition, result) =>
(partition, result.map { leaderAndIsr =>
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
leaderIsrAndControllerEpoch
}
)
})
}
(
leaderIsrAndControllerEpochs ++ exceptionsForPartitionsWithNoLeaderAndIsrInZk,
updatesToRetry
)
(leaderIsrAndControllerEpochs ++ exceptionsForPartitionsWithNoLeaderAndIsrInZk, updatesToRetry)
}
/**

View File

@ -257,7 +257,7 @@ class TopicDeletionManager(config: KafkaConfig,
* removed from their caches.
*/
private def onTopicDeletion(topics: Set[String]): Unit = {
val unseenTopicsForDeletion = topics -- controllerContext.topicsWithDeletionStarted
val unseenTopicsForDeletion = topics.diff(controllerContext.topicsWithDeletionStarted)
if (unseenTopicsForDeletion.nonEmpty) {
val unseenPartitionsForDeletion = unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)
partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq, OfflinePartition)
@ -295,7 +295,7 @@ class TopicDeletionManager(config: KafkaConfig,
}
val successfullyDeletedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
val replicasForDeletionRetry = aliveReplicas -- successfullyDeletedReplicas
val replicasForDeletionRetry = aliveReplicas.diff(successfullyDeletedReplicas)
allDeadReplicas ++= deadReplicas
allReplicasForDeletionRetry ++= replicasForDeletionRetry

View File

@ -35,7 +35,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import scala.collection.{Map, Seq, immutable}
import scala.collection.{Map, Seq, immutable, mutable}
import scala.math.max
/**
@ -423,7 +423,7 @@ class GroupCoordinator(val brokerId: Int,
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
// fill any missing members with an empty assignment
val missing = group.allMembers -- groupAssignment.keySet
val missing = group.allMembers.diff(groupAssignment.keySet)
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
if (missing.nonEmpty) {
@ -519,8 +519,8 @@ class GroupCoordinator(val brokerId: Int,
}
def handleDeleteGroups(groupIds: Set[String]): Map[String, Errors] = {
var groupErrors: Map[String, Errors] = Map()
var groupsEligibleForDeletion: Seq[GroupMetadata] = Seq()
val groupErrors = mutable.Map.empty[String, Errors]
val groupsEligibleForDeletion = mutable.ArrayBuffer[GroupMetadata]()
groupIds.foreach { groupId =>
validateGroupStatus(groupId, ApiKeys.DELETE_GROUPS) match {
@ -540,9 +540,9 @@ class GroupCoordinator(val brokerId: Int,
(if (groupManager.groupNotExists(groupId)) Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR)
case Empty =>
group.transitionTo(Dead)
groupsEligibleForDeletion :+= group
groupsEligibleForDeletion += group
case Stable | PreparingRebalance | CompletingRebalance =>
groupErrors += groupId -> Errors.NON_EMPTY_GROUP
groupErrors(groupId) = Errors.NON_EMPTY_GROUP
}
}
}

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.types.SchemaException
import org.apache.kafka.common.utils.Time
import scala.collection.{Seq, immutable, mutable}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
private[group] sealed trait GroupState {
val validPreviousStates: Set[GroupState]
@ -411,22 +411,21 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
val candidates = candidateProtocols
// let each member vote for one of the protocols and choose the one with the most votes
val votes: List[(String, Int)] = allMemberMetadata
val (protocol, _) = allMemberMetadata
.map(_.vote(candidates))
.groupBy(identity)
.mapValues(_.size)
.toList
.maxBy { case (_, votes) => votes.size }
votes.maxBy(_._2)._1
protocol
}
private def candidateProtocols = {
private def candidateProtocols: Set[String] = {
// get the set of protocols that are commonly supported by all members
val numMembers = members.size
supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet
}
def supportsProtocols(memberProtocolType: String, memberProtocols: Set[String]) = {
def supportsProtocols(memberProtocolType: String, memberProtocols: Set[String]): Boolean = {
if (is(Empty))
!memberProtocolType.isEmpty && memberProtocols.nonEmpty
else
@ -467,12 +466,11 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
}.reduceLeft(_ ++ _)
)
} catch {
case e: SchemaException => {
case e: SchemaException =>
warn(s"Failed to parse Consumer Protocol ${ConsumerProtocol.PROTOCOL_TYPE}:${protocolName.get} " +
s"of group $groupId. Consumer group coordinator is not aware of the subscribed topics.", e)
None
}
}
case Some(ConsumerProtocol.PROTOCOL_TYPE) if members.isEmpty =>
Option(Set.empty)
@ -483,7 +481,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def updateMember(member: MemberMetadata,
protocols: List[(String, Array[Byte])],
callback: JoinCallback) = {
callback: JoinCallback): Unit = {
member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 }
protocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }
member.supportedProtocols = protocols
@ -519,7 +517,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
}
}
def initNextGeneration() = {
def initNextGeneration(): Unit = {
if (members.nonEmpty) {
generationId += 1
protocolName = Some(selectProtocol)

View File

@ -49,7 +49,7 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchRespons
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection._
import scala.collection.mutable.ArrayBuffer
@ -534,7 +534,7 @@ class GroupMetadataManager(brokerId: Int,
doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
val endTimeMs = time.milliseconds()
val totalLoadingTimeMs = endTimeMs - startTimeMs
partitionLoadSensor.record(totalLoadingTimeMs, endTimeMs, false)
partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false)
info(s"Finished loading offsets and group metadata from $topicPartition "
+ s"in $totalLoadingTimeMs milliseconds, of which $schedulerTimeMs milliseconds"
+ s" was spent in the scheduler.")
@ -666,19 +666,21 @@ class GroupMetadataManager(brokerId: Int,
val (groupOffsets, emptyGroupOffsets) = loadedOffsets
.groupBy(_._1.group)
.mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) })
.partition { case (group, _) => loadedGroups.contains(group) }
.map { case (k, v) =>
k -> v.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) }
}.partition { case (group, _) => loadedGroups.contains(group) }
val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
pendingOffsets.foreach { case (producerId, producerOffsets) =>
producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
producerOffsets
.groupBy(_._1.group)
.mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)})
.foreach { case (group, offsets) =>
val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
groupProducerOffsets ++= offsets
groupProducerOffsets ++= offsets.map { case (groupTopicPartition, offset) =>
(groupTopicPartition.topicPartition, offset)
}
}
}

View File

@ -22,7 +22,7 @@ import kafka.utils.{Json, Logging}
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.common.KafkaException
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
/**
* ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way

View File

@ -118,7 +118,7 @@ class TransactionCoordinator(brokerId: Int,
// check transactionTimeoutMs is not larger than the broker configured maximum allowed value
responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
} else {
val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).right.flatMap {
val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).flatMap {
case None =>
val producerId = producerIdManager.generateProducerId()
val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
@ -135,7 +135,7 @@ class TransactionCoordinator(brokerId: Int,
case Some(epochAndTxnMetadata) => Right(epochAndTxnMetadata)
}
val result: ApiResult[(Int, TxnTransitMetadata)] = coordinatorEpochAndMetadata.right.flatMap {
val result: ApiResult[(Int, TxnTransitMetadata)] = coordinatorEpochAndMetadata.flatMap {
existingEpochAndMetadata =>
val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch
val txnMetadata = existingEpochAndMetadata.transactionMetadata
@ -266,7 +266,7 @@ class TransactionCoordinator(brokerId: Int,
} else {
// try to update the transaction metadata and append the updated metadata to txn log;
// if there is no such metadata treat it as invalid producerId mapping error.
val result: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).right.flatMap {
val result: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).flatMap {
case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
case Some(epochAndMetadata) =>
@ -368,7 +368,7 @@ class TransactionCoordinator(brokerId: Int,
if (transactionalId == null || transactionalId.isEmpty)
responseCallback(Errors.INVALID_REQUEST)
else {
val preAppendResult: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).right.flatMap {
val preAppendResult: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).flatMap {
case None =>
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
@ -440,7 +440,7 @@ class TransactionCoordinator(brokerId: Int,
case Right((coordinatorEpoch, newMetadata)) =>
def sendTxnMarkersCallback(error: Errors): Unit = {
if (error == Errors.NONE) {
val preSendResult: ApiResult[(TransactionMetadata, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).right.flatMap {
val preSendResult: ApiResult[(TransactionMetadata, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).flatMap {
case None =>
val errorMsg = s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
s"no metadata in the cache; this is not expected"
@ -535,7 +535,7 @@ class TransactionCoordinator(brokerId: Int,
private[transaction] def abortTimedOutTransactions(onComplete: TransactionalIdAndProducerIdEpoch => EndTxnCallback): Unit = {
txnManager.timedOutTransactions().foreach { txnIdAndPidEpoch =>
txnManager.getTransactionState(txnIdAndPidEpoch.transactionalId).right.foreach {
txnManager.getTransactionState(txnIdAndPidEpoch.transactionalId).foreach {
case None =>
error(s"Could not find transaction metadata when trying to timeout transaction for $txnIdAndPidEpoch")

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.{concurrent, immutable}
object TransactionMarkerChannelManager {

View File

@ -24,7 +24,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.WriteTxnMarkersResponse
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
class TransactionMarkerRequestCompletionHandler(brokerId: Int,
txnStateManager: TransactionStateManager,

View File

@ -38,7 +38,7 @@ import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@ -216,8 +216,8 @@ class TransactionStateManager(brokerId: Int,
}
def putTransactionStateIfNotExists(txnMetadata: TransactionMetadata): Either[Errors, CoordinatorEpochAndTxnMetadata] = {
getAndMaybeAddTransactionState(txnMetadata.transactionalId, Some(txnMetadata))
.right.map(_.getOrElse(throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata")))
getAndMaybeAddTransactionState(txnMetadata.transactionalId, Some(txnMetadata)).map(_.getOrElse(
throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata")))
}
/**
@ -391,7 +391,7 @@ class TransactionStateManager(brokerId: Int,
val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch)
val endTimeMs = time.milliseconds()
val totalLoadingTimeMs = endTimeMs - startTimeMs
partitionLoadSensor.record(totalLoadingTimeMs, endTimeMs, false)
partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false)
info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in " +
s"$totalLoadingTimeMs milliseconds, of which $schedulerTimeMs milliseconds was spent in the scheduler.")
@ -439,7 +439,7 @@ class TransactionStateManager(brokerId: Int,
def removeTransactionsForTxnTopicPartition(partitionId: Int): Unit = {
val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
inWriteLock(stateLock) {
loadingPartitions.retain(_.txnPartitionId != partitionId)
loadingPartitions --= loadingPartitions.filter(_.txnPartitionId == partitionId)
transactionMetadataCache.remove(partitionId).foreach { txnMetadataCacheEntry =>
info(s"Unloaded transaction metadata $txnMetadataCacheEntry for $topicPartition following " +
s"local partition deletion")

View File

@ -44,7 +44,7 @@ import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Seq, Set, mutable}
@ -531,8 +531,8 @@ class Log(@volatile private var _dir: File,
Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
}
var swapFiles = Set[File]()
var cleanFiles = Set[File]()
val swapFiles = mutable.Set[File]()
val cleanFiles = mutable.Set[File]()
var minCleanedFileOffset = Long.MaxValue
for (file <- dir.listFiles if file.isFile) {
@ -767,7 +767,7 @@ class Log(@volatile private var _dir: File,
// if we have the clean shutdown marker, skip recovery
if (!hasCleanShutdownFile) {
// okay we need to actually recover this log
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
var truncated = false
while (unflushed.hasNext && !truncated) {

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.collection.{Iterable, Seq, Set, mutable}
import scala.util.control.ControlThrowable
@ -381,19 +381,19 @@ class LogCleaner(initialConfig: CleanerConfig,
def mb(bytes: Double) = bytes / (1024*1024)
val message =
"%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) +
"\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead),
"\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead.toDouble),
stats.elapsedSecs,
mb(stats.bytesRead/stats.elapsedSecs)) +
"\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead),
mb(stats.bytesRead.toDouble / stats.elapsedSecs)) +
"\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead.toDouble),
stats.elapsedIndexSecs,
mb(stats.mapBytesRead)/stats.elapsedIndexSecs,
100 * stats.elapsedIndexSecs/stats.elapsedSecs) +
mb(stats.mapBytesRead.toDouble) / stats.elapsedIndexSecs,
100 * stats.elapsedIndexSecs / stats.elapsedSecs) +
"\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) +
"\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead),
"\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead.toDouble),
stats.elapsedSecs - stats.elapsedIndexSecs,
mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) +
"\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead), stats.messagesRead) +
"\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) +
mb(stats.bytesRead.toDouble) / (stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs) / stats.elapsedSecs) +
"\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead.toDouble), stats.messagesRead) +
"\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten.toDouble), stats.messagesWritten) +
"\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead),
100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead))
info(message)
@ -1034,9 +1034,9 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
endTime = time.milliseconds
}
def elapsedSecs = (endTime - startTime)/1000.0
def elapsedSecs: Double = (endTime - startTime) / 1000.0
def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0
def elapsedIndexSecs: Double = (mapCompleteTime - startTime) / 1000.0
}

View File

@ -410,7 +410,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
if (checkpoint != null) {
val existing = checkpoint.read()
if (existing.getOrElse(topicPartition, 0L) > offset)
checkpoint.write(existing + (topicPartition -> offset))
checkpoint.write(mutable.Map() ++= existing += topicPartition -> offset)
}
}
}

View File

@ -19,7 +19,7 @@ package kafka.log
import java.util.{Collections, Locale, Properties}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import kafka.api.{ApiVersion, ApiVersionValidator}
import kafka.message.BrokerCompressionCodec
import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
@ -741,7 +741,7 @@ class LogManager(logDirs: Seq[File],
}
val logDir = logDirs
.toStream // to prevent actually mapping the whole list, lazy map
.iterator // to prevent actually mapping the whole list, lazy map
.map(createLogDirectory(_, logDirName))
.find(_.isSuccess)
.getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", "))))
@ -934,7 +934,7 @@ class LogManager(logDirs: Seq[File],
List(_liveLogDirs.peek())
} else {
// count the number of logs in each parent directory (including 0 for empty directories
val logCounts = allLogs.groupBy(_.parentDir).mapValues(_.size)
val logCounts = allLogs.groupBy(_.parentDir).map { case (parent, logs) => parent -> logs.size }
val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap
val dirCounts = (zeros ++ logCounts).toBuffer

View File

@ -32,7 +32,7 @@ import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampA
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.math._
/**

View File

@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.ProduceResponse.RecordError
import org.apache.kafka.common.utils.Time
import scala.collection.{Seq, mutable}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
/**

View File

@ -147,7 +147,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
mmap.putInt(position)
_entries += 1
_lastOffset = offset
require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
} else {
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.collection.{immutable, mutable}
@ -580,9 +580,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
* Expire any producer ids which have been idle longer than the configured maximum expiration timeout.
*/
def removeExpiredProducers(currentTimeMs: Long): Unit = {
producers.retain { case (_, lastEntry) =>
!isProducerExpired(currentTimeMs, lastEntry)
}
producers --= producers.filter { case (_, lastEntry) => isProducerExpired(currentTimeMs, lastEntry) }.keySet
}
/**

View File

@ -135,7 +135,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
mmap.putInt(relativeOffset(offset))
_entries += 1
_lastEntry = TimestampOffset(timestamp, offset)
require(_entries * entrySize == mmap.position(), _entries + " entries but file position in index is " + mmap.position() + ".")
require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.")
}
}
}

View File

@ -39,7 +39,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
object RequestChannel extends Logging {

View File

@ -51,7 +51,7 @@ import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
import org.slf4j.event.Level
import scala.collection._
import JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, Buffer}
import scala.util.control.ControlThrowable
@ -180,7 +180,7 @@ class SocketServer(val config: KafkaConfig,
.find(_.listenerName == config.interBrokerListenerName)
.getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}"))
val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++
dataPlaneAcceptors.asScala.filterKeys(_ != interBrokerListener).values
dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
orderedAcceptors.foreach { acceptor =>
val endpoint = acceptor.endPoint
debug(s"Wait for authorizer to complete start up on listener ${endpoint.listenerName}")

View File

@ -70,7 +70,7 @@ case class Resource(resourceType: ResourceType, name: String, patternType: Patte
* @deprecated Since 2.0, use [[kafka.security.auth.Resource(ResourceType, String, PatternType)]]
*/
@deprecated("Use Resource(ResourceType, String, PatternType", "Since 2.0")
def this(resourceType: ResourceType, name: String) {
def this(resourceType: ResourceType, name: String) = {
this(resourceType, name, PatternType.LITERAL)
}

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.4")

View File

@ -21,7 +21,7 @@ import java.util.concurrent.{CompletableFuture, CompletionStage}
import com.typesafe.scalalogging.Logger
import kafka.api.KAFKA_2_0_IV1
import kafka.security.authorizer.AclAuthorizer.{AclBuffers, ResourceOrdering, VersionedAcls}
import kafka.security.authorizer.AclAuthorizer.{AclSeqs, ResourceOrdering, VersionedAcls}
import kafka.security.authorizer.AclEntry.ResourceSeparator
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils._
@ -39,8 +39,8 @@ import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer._
import org.apache.zookeeper.client.ZKClientConfig
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.collection.{mutable, Seq}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Random, Success, Try}
object AclAuthorizer {
@ -62,7 +62,7 @@ object AclAuthorizer {
def exists: Boolean = zkVersion != ZkVersion.UnknownVersion
}
class AclBuffers(classes: mutable.Buffer[AclEntry]*) {
class AclSeqs(classes: Seq[AclEntry]*) {
def find(p: AclEntry => Boolean): Option[AclEntry] = classes.flatMap(_.find(p)).headOption
def isEmpty: Boolean = !classes.exists(_.nonEmpty)
}
@ -255,8 +255,9 @@ class AclAuthorizer extends Authorizer with Logging {
}
}
}
val deletedResult = deletedBindings.groupBy(_._2)
.mapValues(_.map { case (binding, _) => new AclBindingDeleteResult(binding, deleteExceptions.getOrElse(binding, null)) })
val deletedResult = deletedBindings.groupBy(_._2).map { case (k, bindings) =>
k -> bindings.keys.map { binding => new AclBindingDeleteResult(binding, deleteExceptions.get(binding).orNull) }
}
(0 until aclBindingFilters.size).map { i =>
new AclDeleteResult(deletedResult.getOrElse(i, Set.empty[AclBindingDeleteResult]).toSet.asJava)
}.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
@ -295,7 +296,7 @@ class AclAuthorizer extends Authorizer with Logging {
val host = requestContext.clientAddress.getHostAddress
val operation = action.operation
def isEmptyAclAndAuthorized(acls: AclBuffers): Boolean = {
def isEmptyAclAndAuthorized(acls: AclSeqs): Boolean = {
if (acls.isEmpty) {
// No ACLs found for this resource, permission is determined by value of config allow.everyone.if.no.acl.found
authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound")
@ -303,12 +304,12 @@ class AclAuthorizer extends Authorizer with Logging {
} else false
}
def denyAclExists(acls: AclBuffers): Boolean = {
def denyAclExists(acls: AclSeqs): Boolean = {
// Check if there are any Deny ACLs which would forbid this operation.
matchingAclExists(operation, resource, principal, host, DENY, acls)
}
def allowAclExists(acls: AclBuffers): Boolean = {
def allowAclExists(acls: AclSeqs): Boolean = {
// Check if there are any Allow ACLs which would allow this operation.
// Allowing read, write, delete, or alter implies allowing describe.
// See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance.
@ -341,7 +342,7 @@ class AclAuthorizer extends Authorizer with Logging {
} else false
}
private def matchingAcls(resourceType: ResourceType, resourceName: String): AclBuffers = {
private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = {
// save aclCache reference to a local val to get a consistent view of the cache during acl updates.
val aclCacheSnapshot = aclCache
val wildcard = aclCacheSnapshot.get(new ResourcePattern(resourceType, ResourcePattern.WILDCARD_RESOURCE, PatternType.LITERAL))
@ -355,12 +356,10 @@ class AclAuthorizer extends Authorizer with Logging {
val prefixed = aclCacheSnapshot
.from(new ResourcePattern(resourceType, resourceName, PatternType.PREFIXED))
.to(new ResourcePattern(resourceType, resourceName.take(1), PatternType.PREFIXED))
.filterKeys(resource => resourceName.startsWith(resource.name))
.values
.flatMap { _.acls.toBuffer }
.flatMap { case (resource, acls) => if (resourceName.startsWith(resource.name)) acls.acls else Seq.empty }
.toBuffer
new AclBuffers(prefixed, wildcard, literal)
new AclSeqs(prefixed, wildcard, literal)
}
private def matchingAclExists(operation: AclOperation,
@ -368,7 +367,7 @@ class AclAuthorizer extends Authorizer with Logging {
principal: KafkaPrincipal,
host: String,
permissionType: AclPermissionType,
acls: AclBuffers): Boolean = {
acls: AclSeqs): Boolean = {
acls.find { acl =>
acl.permissionType == permissionType &&
(acl.kafkaPrincipal == principal || acl.kafkaPrincipal == AclEntry.WildcardPrincipal) &&

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.SecurityUtils
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
object AclEntry {
val WildcardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")

View File

@ -33,7 +33,7 @@ import org.apache.kafka.common.utils.SecurityUtils.parseKafkaPrincipal
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, AuthorizerServerInfo, _}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, immutable, mutable}
import scala.util.{Failure, Success, Try}

View File

@ -32,7 +32,7 @@ import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.protocol.Errors
import scala.collection.{mutable, Map, Set}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

View File

@ -46,7 +46,7 @@ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, Describe
import org.apache.kafka.common.utils.Sanitizer
import scala.collection.{Map, mutable, _}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
class AdminManager(val config: KafkaConfig,
val metrics: Metrics,
@ -518,8 +518,9 @@ class AdminManager(val config: KafkaConfig,
configs.map { case (resource, alterConfigOps) =>
try {
// throw InvalidRequestException if any duplicate keys
val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry().name())
.mapValues(_.size).filter(_._2 > 1).keys.toSet
val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry.name).filter { case (_, v) =>
v.size > 1
}.keySet
if (duplicateKeys.nonEmpty)
throw new InvalidRequestException(s"Error due to duplicate config keys : ${duplicateKeys.mkString(",")}")
val nullUpdates = alterConfigOps
@ -773,7 +774,7 @@ class AdminManager(val config: KafkaConfig,
}
def handleDescribeClientQuotas(userComponent: Option[ClientQuotaFilterComponent],
clientIdComponent: Option[ClientQuotaFilterComponent], strict: Boolean) = {
clientIdComponent: Option[ClientQuotaFilterComponent], strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = {
def toOption(opt: java.util.Optional[String]): Option[String] =
if (opt == null)

View File

@ -66,7 +66,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
}
def read(): Option[BrokerMetadata] = {
Files.deleteIfExists(new File(file + ".tmp").toPath()) // try to delete any existing temp files for cleanliness
Files.deleteIfExists(new File(file.getPath + ".tmp").toPath()) // try to delete any existing temp files for cleanliness
lock synchronized {
try {

View File

@ -32,7 +32,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
/**
* Represents the sensors aggregated per client
@ -162,7 +162,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val time: Time,
threadNamePrefix: String,
clientQuotaCallback: Option[ClientQuotaCallback] = None) extends Logging {
private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault.toDouble)
private val clientQuotaType = quotaTypeToClientQuotaType(quotaType)
@volatile private var quotaTypesEnabled = clientQuotaCallback match {
case Some(_) => QuotaTypes.CustomQuotas
@ -176,7 +176,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private[server] val throttledChannelReaper = new ThrottledChannelReaper(delayQueue, threadNamePrefix)
private val quotaCallback = clientQuotaCallback.getOrElse(new DefaultQuotaCallback)
private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
private val delayQueueSensor = metrics.sensor(quotaType.toString + "-delayQueue")
delayQueueSensor.add(metrics.metricName("queue-size",
quotaType.toString,
"Tracks the size of the delay queue"), new CumulativeSum())
@ -506,8 +506,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
} else {
val quotaMetricName = clientRateMetricName(Map.empty)
allMetrics.asScala.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
case (metricName, metric) =>
allMetrics.asScala.foreach { case (metricName, metric) =>
if (metricName.name == quotaMetricName.name && metricName.group == quotaMetricName.group) {
val metricTags = metricName.tags
Option(quotaLimit(metricTags)).foreach { newQuota =>
if (newQuota != metric.config.quota.bound) {
@ -518,6 +518,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
}
}
}
protected def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = {
metrics.metricName("byte-rate", quotaType.toString,

View File

@ -24,7 +24,7 @@ import org.apache.kafka.common.metrics._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.quota.ClientQuotaCallback
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,

View File

@ -33,7 +33,7 @@ import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.common.utils.Sanitizer
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.util.Try
@ -128,13 +128,13 @@ class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
val clientId = sanitizedClientId.map(Sanitizer.desanitize)
val producerQuota =
if (config.containsKey(DynamicConfig.Client.ProducerByteRateOverrideProp))
Some(new Quota(config.getProperty(DynamicConfig.Client.ProducerByteRateOverrideProp).toLong, true))
Some(new Quota(config.getProperty(DynamicConfig.Client.ProducerByteRateOverrideProp).toLong.toDouble, true))
else
None
quotaManagers.produce.updateQuota(sanitizedUser, clientId, sanitizedClientId, producerQuota)
val consumerQuota =
if (config.containsKey(DynamicConfig.Client.ConsumerByteRateOverrideProp))
Some(new Quota(config.getProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp).toLong, true))
Some(new Quota(config.getProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp).toLong.toDouble, true))
else
None
quotaManagers.fetch.updateQuota(sanitizedUser, clientId, sanitizedClientId, consumerQuota)
@ -197,9 +197,9 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
brokerConfig.dynamicConfig.updateDefaultConfig(properties)
else if (brokerConfig.brokerId == brokerId.trim.toInt) {
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
quotaManagers.leader.updateQuota(upperBound(getOrDefault(LeaderReplicationThrottledRateProp)))
quotaManagers.follower.updateQuota(upperBound(getOrDefault(FollowerReplicationThrottledRateProp)))
quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(ReplicaAlterLogDirsIoMaxBytesPerSecondProp)))
quotaManagers.leader.updateQuota(upperBound(getOrDefault(LeaderReplicationThrottledRateProp).toDouble))
quotaManagers.follower.updateQuota(upperBound(getOrDefault(FollowerReplicationThrottledRateProp).toDouble))
quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(ReplicaAlterLogDirsIoMaxBytesPerSecondProp).toDouble))
}
}
}

View File

@ -34,7 +34,7 @@ class DelayedElectLeader(
responseCallback: Map[TopicPartition, ApiError] => Unit
) extends DelayedOperation(delayMs) {
private var waitingPartitions = expectedLeaders
private val waitingPartitions = mutable.Map() ++= expectedLeaders
private val fullResults = mutable.Map() ++= results
@ -52,7 +52,7 @@ class DelayedElectLeader(
updateWaiting()
val timedOut = waitingPartitions.map {
case (tp, _) => tp -> new ApiError(Errors.REQUEST_TIMED_OUT, null)
}.toMap
}
responseCallback(timedOut ++ fullResults)
}

View File

@ -36,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils, Time}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable
object DelegationTokenManager {

View File

@ -36,7 +36,7 @@ import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.Utils
import scala.collection._
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
/**
* Dynamic broker configurations are stored in ZooKeeper and may be defined at two levels:
@ -85,8 +85,8 @@ object DynamicBrokerConfig {
SocketServer.ReconfigurableConfigs
private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp)
private val PerBrokerConfigs = DynamicSecurityConfigs ++
DynamicListenerConfig.ReconfigurableConfigs -- ClusterLevelListenerConfigs
private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(
ClusterLevelListenerConfigs)
private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp,
KafkaConfig.SaslLoginCallbackHandlerClassProp,
KafkaConfig.SaslLoginClassProp,
@ -168,12 +168,14 @@ object DynamicBrokerConfig {
}
private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = {
KafkaConfig.configKeys.filterKeys(AllDynamicConfigs.contains).values.foreach { config =>
KafkaConfig.configKeys.foreach { case (configName, config) =>
if (AllDynamicConfigs.contains(configName)) {
configDef.define(config.name, config.`type`, config.defaultValue, config.validator,
config.importance, config.documentation, config.group, config.orderInGroup, config.width,
config.displayName, config.dependents, config.recommender)
}
}
}
private[server] def dynamicConfigUpdateModes: util.Map[String, String] = {
AllDynamicConfigs.map { name =>
@ -352,7 +354,10 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
props.setProperty(configName, passwordEncoder.encode(new Password(value)))
}
}
configProps.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => encodePassword(name, value) }
configProps.asScala.foreach { case (name, value) =>
if (isPasswordConfig(name))
encodePassword(name, value)
}
props
}
@ -386,7 +391,10 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
}
props.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => decodePassword(name, value) }
props.asScala.foreach { case (name, value) =>
if (isPasswordConfig(name))
decodePassword(name, value)
}
props
}
@ -398,8 +406,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
val props = persistentProps.clone().asInstanceOf[Properties]
if (props.asScala.keySet.exists(isPasswordConfig)) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
persistentProps.asScala.filterKeys(isPasswordConfig).foreach { case (configName, value) =>
if (value != null) {
persistentProps.asScala.foreach { case (configName, value) =>
if (isPasswordConfig(configName) && value != null) {
val decoded = try {
Some(passwordDecoder.decode(value).value)
} catch {
@ -633,7 +641,7 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Brok
logManager.allLogs.foreach { log =>
val props = mutable.Map.empty[Any, Any]
props ++= newBrokerDefaults
props ++= log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)
props ++= log.config.originals.asScala.filter { case (k, _) => log.config.overriddenConfigs.contains(k) }
val logConfig = LogConfig(props.asJava, log.config.overriddenConfigs)
log.updateConfig(logConfig)
@ -644,8 +652,8 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Brok
val currentLogConfig = logManager.currentDefaultConfig
val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals)
newConfig.valuesFromThisConfig.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach { case (k, v) =>
if (v != null) {
newConfig.valuesFromThisConfig.asScala.foreach { case (k, v) =>
if (DynamicLogConfig.ReconfigurableConfigs.contains(k) && v != null) {
DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
}
@ -678,7 +686,8 @@ class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable {
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
newConfig.values.asScala.filterKeys(DynamicThreadPool.ReconfigurableConfigs.contains).foreach { case (k, v) =>
newConfig.values.asScala.foreach { case (k, v) =>
if (DynamicThreadPool.ReconfigurableConfigs.contains(k)) {
val newValue = v.asInstanceOf[Int]
val oldValue = currentValue(k)
if (newValue != oldValue) {
@ -692,6 +701,7 @@ class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable {
}
}
}
}
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
if (newConfig.numIoThreads != oldConfig.numIoThreads)
@ -762,7 +772,7 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
override def reconfigure(configs: util.Map[String, _]): Unit = {
val updatedMetricsReporters = metricsReporterClasses(configs)
val deleted = currentReporters.keySet -- updatedMetricsReporters
val deleted = currentReporters.keySet.toSet -- updatedMetricsReporters
deleted.foreach(removeReporter)
currentReporters.values.foreach {
case reporter: Reconfigurable => dynamicConfig.maybeReconfigure(reporter, dynamicConfig.currentKafkaConfig, configs)

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance._
import org.apache.kafka.common.config.ConfigDef.Range._
import org.apache.kafka.common.config.ConfigDef.Type._
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
/**
* Class used to hold dynamic configs. These are configs which have no physical manifestation in the server.properties

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection._
/**

View File

@ -32,7 +32,7 @@ import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, Time, Utils}
import scala.math.Ordered.orderingToOrdered
import scala.collection.{mutable, _}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
object FetchSession {
type REQ_MAP = util.Map[TopicPartition, FetchRequest.PartitionData]

View File

@ -81,7 +81,7 @@ import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePar
import org.apache.kafka.server.authorizer._
import scala.compat.java8.OptionConverters._
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.util.{Failure, Success, Try}
@ -892,9 +892,9 @@ class KafkaApis(val requestChannel: RequestChannel,
case (topicPartition, _) => authorizedTopics.contains(topicPartition.topic)
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, List[JLong]().asJava)
)
val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) =>
k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava)
}
val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) =>
try {
@ -932,12 +932,12 @@ class KafkaApis(val requestChannel: RequestChannel,
case (topicPartition, _) => authorizedTopics.contains(topicPartition.topic)
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) =>
k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET,
Optional.empty())
})
}
val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) =>
if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
@ -1081,7 +1081,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topics.isEmpty || topicResponses.size == topics.size) {
topicResponses
} else {
val nonExistentTopics = topics -- topicResponses.map(_.name).toSet
val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
if (isInternal(topic)) {
val topicMetadata = createInternalTopic(topic)
@ -1120,8 +1120,8 @@ class KafkaApis(val requestChannel: RequestChannel,
if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
if (!authorize(request, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
val authorizedForCreateTopics = filterAuthorized(request, CREATE, TOPIC, nonExistingTopics.toSeq)
unauthorizedForCreateTopics = nonExistingTopics -- authorizedForCreateTopics
authorizedTopics --= unauthorizedForCreateTopics
unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics)
authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics)
}
}
}
@ -2201,7 +2201,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending an offset commit response
def sendResponseCallback(authorizedTopicErrors: Map[TopicPartition, Errors]): Unit = {
var combinedCommitStatus = authorizedTopicErrors ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
val combinedCommitStatus = mutable.Map() ++= authorizedTopicErrors ++= unauthorizedTopicErrors ++= nonExistingTopicErrors
if (isDebugEnabled)
combinedCommitStatus.foreach { case (topicPartition, error) =>
if (error != Errors.NONE) {
@ -2216,10 +2216,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed to have
// the fix to check for the loading error.
if (txnOffsetCommitRequest.version < 2) {
combinedCommitStatus.foreach { case (topicPartition, error) =>
if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
combinedCommitStatus += topicPartition -> Errors.COORDINATOR_NOT_AVAILABLE
}
combinedCommitStatus ++= combinedCommitStatus.collect {
case (tp, error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS => tp -> Errors.COORDINATOR_NOT_AVAILABLE
}
}
@ -2370,9 +2368,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val endOffsetsForAuthorizedPartitions = replicaManager.lastOffsetForLeaderEpoch(authorizedPartitions)
val endOffsetsForUnauthorizedPartitions = unauthorizedPartitions.mapValues(_ =>
new EpochEndOffset(Errors.TOPIC_AUTHORIZATION_FAILED, EpochEndOffset.UNDEFINED_EPOCH,
EpochEndOffset.UNDEFINED_EPOCH_OFFSET))
val endOffsetsForUnauthorizedPartitions = unauthorizedPartitions.map { case (k, _) =>
k -> new EpochEndOffset(Errors.TOPIC_AUTHORIZATION_FAILED, EpochEndOffset.UNDEFINED_EPOCH,
EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
}
val endOffsetsForAllPartitions = endOffsetsForAuthorizedPartitions ++ endOffsetsForUnauthorizedPartitions
sendResponseMaybeThrottle(request, requestThrottleMs =>
@ -2844,8 +2843,9 @@ class KafkaApis(val requestChannel: RequestChannel,
val describeClientQuotasRequest = request.body[DescribeClientQuotasRequest]
if (authorize(request, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) {
val result = adminManager.describeClientQuotas(
describeClientQuotasRequest.filter).mapValues(_.mapValues(Double.box).toMap.asJava).toMap.asJava
val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter).map { case (quotaEntity, quotaConfigs) =>
quotaEntity -> quotaConfigs.map { case (key, value) => key -> Double.box(value) }.asJava
}.asJava
sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeClientQuotasResponse(result, requestThrottleMs))
} else {
@ -2890,15 +2890,16 @@ class KafkaApis(val requestChannel: RequestChannel,
logIfDenied: Boolean = true): Set[String] = {
authorizer match {
case Some(authZ) =>
val resources = resourceNames.groupBy(identity).mapValues(_.size).toList
val actions = resources.map { case (resourceName, count) =>
val groupedResourceNames = resourceNames.groupBy(identity)
val actions = resourceNames.map { resourceName =>
val count = groupedResourceNames(resourceName).size
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
new Action(operation, resource, count, logIfAllowed, logIfDenied)
}
authZ.authorize(request.context, actions.asJava).asScala
.zip(resources.map(_._1)) // zip with resource name
.filter(_._1 == AuthorizationResult.ALLOWED) // filter authorized resources
.map(_._2).toSet
.zip(resourceNames)
.filter { case (authzResult, _) => authzResult == AuthorizationResult.ALLOWED }
.map { case (_, resourceName) => resourceName }.toSet
case None =>
resourceNames.toSet
}

View File

@ -43,7 +43,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.zookeeper.client.ZKClientConfig
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq}
object Defaults {
@ -239,7 +239,7 @@ object Defaults {
/** ********* Sasl configuration ***********/
val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM
val SaslEnabledMechanisms = SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS
val SaslEnabledMechanisms = BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS
val SaslKerberosKinitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD
val SaslKerberosTicketRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER

View File

@ -28,7 +28,7 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{KafkaThread, Time}
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
/**
* A thread that answers kafka requests.

View File

@ -50,7 +50,7 @@ import org.apache.kafka.common.{ClusterResource, Endpoint, Node}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.zookeeper.client.ZKClientConfig
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, mutable}
object KafkaServer {
@ -307,9 +307,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
case Some(authZ) =>
authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.mapValues(_.toCompletableFuture).toMap
authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
ep -> cs.toCompletableFuture
}
case None =>
brokerInfo.broker.endPoints.map { ep => ep.toJava -> CompletableFuture.completedFuture[Void](null) }.toMap
brokerInfo.broker.endPoints.map { ep =>
ep.toJava -> CompletableFuture.completedFuture[Void](null)
}.toMap
}
val fetchManager = new FetchManager(Time.SYSTEM,
@ -726,7 +730,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}
if (brokerMetadataSet.size > 1) {
val builder = StringBuilder.newBuilder
val builder = new StringBuilder
for ((logDir, brokerMetadata) <- brokerMetadataMap)
builder ++= s"- $logDir -> $brokerMetadata\n"

View File

@ -22,7 +22,7 @@ import java.util.{Collections}
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.{mutable, Seq, Set}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import kafka.cluster.{Broker, EndPoint}
import kafka.api._
import kafka.controller.StateChangeLogger
@ -199,7 +199,7 @@ class MetadataCache(brokerId: Int) extends Logging {
}
def getNonExistingTopics(topics: Set[String]): Set[String] = {
topics -- metadataSnapshot.partitionStates.keySet
topics.diff(metadataSnapshot.partitionStates.keySet)
}
def getAliveBroker(brokerId: Int): Option[Broker] = {

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.{mutable, Map, Seq, Set}
class ReplicaAlterLogDirsThread(name: String,

View File

@ -29,7 +29,7 @@ import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpda
import org.apache.kafka.common.{Node, Reconfigurable}
import org.apache.kafka.common.requests.AbstractRequest.Builder
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
trait BlockingSend {

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{LogContext, Time}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.{mutable, Map}
class ReplicaFetcherThread(name: String,

View File

@ -57,7 +57,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters._
@ -181,7 +181,7 @@ class ReplicaManager(val config: KafkaConfig,
brokerTopicStats: BrokerTopicStats,
metadataCache: MetadataCache,
logDirFailureChannel: LogDirFailureChannel,
threadNamePrefix: Option[String] = None) {
threadNamePrefix: Option[String] = None) = {
this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown,
quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
DelayedOperationPurgatory[DelayedProduce](
@ -1256,14 +1256,14 @@ class ReplicaManager(val config: KafkaConfig,
}
}
val partitionsTobeLeader = partitionStates.filter { case (_, partitionState) =>
val partitionsToBeLeader = partitionStates.filter { case (_, partitionState) =>
partitionState.leader == localBrokerId
}
val partitionsToBeFollower = partitionStates -- partitionsTobeLeader.keys
val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) }
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap,
val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
highWatermarkCheckpoints)
else
Set.empty[Partition]

View File

@ -134,7 +134,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
*/
def record(value: Long): Unit = {
try {
sensor().record(value)
sensor().record(value.toDouble)
} catch {
case qve: QuotaViolationException =>
trace(s"Record: Quota violated, but ignored, for sensor (${sensor.name}), metric: (${qve.metricName}), value : (${qve.value}), bound: (${qve.bound}), recordedValue ($value)")

View File

@ -37,7 +37,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
/**
* Consumer that dumps messages to standard out.

View File

@ -31,7 +31,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
object ConsoleProducer {

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable
/**
@ -175,7 +175,7 @@ object ConsumerPerformance extends LazyLogging {
startMs: Long,
endMs: Long,
dateFormat: SimpleDateFormat): Unit = {
val elapsedMs: Double = endMs - startMs
val elapsedMs: Double = (endMs - startMs).toDouble
val totalMbRead = (bytesRead * 1.0) / (1024 * 1024)
val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
val intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs

View File

@ -27,7 +27,7 @@ import kafka.utils._
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

View File

@ -29,7 +29,7 @@ import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.Random
@ -147,7 +147,7 @@ object EndToEndLatency {
//Report progress
if (i % 1000 == 0)
println(i + "\t" + elapsed / 1000.0 / 1000.0)
println(i.toString + "\t" + elapsed / 1000.0 / 1000.0)
totalTime += elapsed
latencies(i) = elapsed / 1000 / 1000
}

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.kafka.common.requests.ListOffsetRequest
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.Seq
object GetOffsetShell {
@ -143,7 +143,7 @@ object GetOffsetShell {
* Return the partition infos for `topic`. If the topic does not exist, `None` is returned.
*/
private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
val partitionInfos = consumer.listTopics.asScala.filterKeys(_ == topic).values.flatMap(_.asScala).toBuffer
val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer
if (partitionInfos.isEmpty)
None
else if (partitionIds.isEmpty)

View File

@ -26,7 +26,7 @@ import javax.rmi.ssl.SslRMIClientSocketFactory
import joptsimple.OptionParser
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.math._
import kafka.utils.{CommandLineUtils, Exit, Logging}
@ -258,8 +258,8 @@ object JmxTool extends Logging {
attributesWhitelist match {
case Some(allowedAttributes) =>
if (allowedAttributes.contains(attr.getName))
attributes(name + ":" + attr.getName) = attr.getValue
case None => attributes(name + ":" + attr.getName) = attr.getValue
attributes(name.toString + ":" + attr.getName) = attr.getValue
case None => attributes(name.toString + ":" + attr.getName) = attr.getValue
}
}
}

View File

@ -36,7 +36,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.HashMap
import scala.util.control.ControlThrowable
import scala.util.{Failure, Success, Try}
@ -134,7 +134,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
case _: TimeoutException =>
Try(consumerWrapper.consumer.listTopics) match {
case Success(visibleTopics) =>
consumerWrapper.offsets.retain((tp, _) => visibleTopics.containsKey(tp.topic))
consumerWrapper.offsets --= consumerWrapper.offsets.keySet.filter(tp => !visibleTopics.containsKey(tp.topic))
case Failure(e) =>
warn("Failed to list all authorized topics after committing offsets timed out: ", e)
}

View File

@ -42,7 +42,7 @@ import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.Seq
/**
@ -225,16 +225,16 @@ object ReplicaVerificationTool extends Logging {
}
private def initialOffsets(topicPartitions: Seq[TopicPartition], consumerConfig: Properties,
initialOffsetTime: Long): Map[TopicPartition, Long] = {
initialOffsetTime: Long): collection.Map[TopicPartition, Long] = {
val consumer = createConsumer(consumerConfig)
try {
if (ListOffsetRequest.LATEST_TIMESTAMP == initialOffsetTime)
consumer.endOffsets(topicPartitions.asJava).asScala.mapValues(_.longValue).toMap
consumer.endOffsets(topicPartitions.asJava).asScala.map { case (k, v) => k -> v.longValue }
else if (ListOffsetRequest.EARLIEST_TIMESTAMP == initialOffsetTime)
consumer.beginningOffsets(topicPartitions.asJava).asScala.mapValues(_.longValue).toMap
consumer.beginningOffsets(topicPartitions.asJava).asScala.map { case (k, v) => k -> v.longValue }
else {
val timestampsToSearch = topicPartitions.map(tp => tp -> (initialOffsetTime: java.lang.Long)).toMap
consumer.offsetsForTimes(timestampsToSearch.asJava).asScala.mapValues(v => v.offset).toMap
consumer.offsetsForTimes(timestampsToSearch.asJava).asScala.map { case (k, v) => k -> v.offset }
}
} finally consumer.close()
}
@ -256,8 +256,8 @@ private case class TopicPartitionReplica(topic: String, partitionId: Int, replic
private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long)
private class ReplicaBuffer(expectedReplicasPerTopicPartition: Map[TopicPartition, Int],
initialOffsets: Map[TopicPartition, Long],
private class ReplicaBuffer(expectedReplicasPerTopicPartition: collection.Map[TopicPartition, Int],
initialOffsets: collection.Map[TopicPartition, Long],
expectedNumFetchers: Int,
reportInterval: Long) extends Logging {
private val fetchOffsetMap = new Pool[TopicPartition, Long]
@ -358,8 +358,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicPartition: Map[TopicPartitio
if (isMessageInAllReplicas) {
val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset
fetchOffsetMap.put(topicPartition, nextOffset)
debug(expectedReplicasPerTopicPartition(topicPartition) + " replicas match at offset " +
nextOffset + " for " + topicPartition)
debug(s"${expectedReplicasPerTopicPartition(topicPartition)} replicas match at offset " +
s"$nextOffset for $topicPartition")
}
}
if (maxHw - fetchOffsetMap.get(topicPartition) > maxLag) {

View File

@ -240,7 +240,7 @@ object CoreUtils {
/**
* Returns a list of duplicated items
*/
def duplicates[T](s: Traversable[T]): Iterable[T] = {
def duplicates[T](s: Iterable[T]): Iterable[T] = {
s.groupBy(identity)
.map { case (k, l) => (k, l.size)}
.filter { case (_, l) => l > 1 }

View File

@ -20,7 +20,7 @@ package kafka.utils
import java.util
import java.util.Properties
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
/**
* In order to have these implicits in scope, add the following import:

View File

@ -63,7 +63,7 @@ object Json {
catch { case _: JsonProcessingException => None }
def tryParseBytes(input: Array[Byte]): Either[JsonProcessingException, JsonValue] =
try Right(mapper.readTree(input)).right.map(JsonValue(_))
try Right(mapper.readTree(input)).map(JsonValue(_))
catch { case e: JsonProcessingException => Left(e) }
/**

View File

@ -23,7 +23,7 @@ import java.util.Locale
import org.apache.log4j.{Level, LogManager, Logger}
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
object Log4jController {

View File

@ -21,8 +21,8 @@ import java.util.concurrent._
import org.apache.kafka.common.KafkaException
import collection.mutable
import collection.JavaConverters._
import collection.Set
import scala.jdk.CollectionConverters._
class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
@ -69,7 +69,7 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
def remove(key: K, value: V): Boolean = pool.remove(key, value)
def keys: mutable.Set[K] = pool.keySet.asScala
def keys: Set[K] = pool.keySet.asScala
def values: Iterable[V] = pool.values.asScala

View File

@ -21,7 +21,7 @@ import java.util.Properties
import java.util.Collections
import scala.collection._
import kafka.message.{CompressionCodec, NoCompressionCodec}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
class VerifiableProperties(val props: Properties) extends Logging {

View File

@ -19,7 +19,7 @@ package kafka.utils.json
import scala.collection.{Map, Seq}
import scala.collection.compat._
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
@ -82,7 +82,7 @@ object DecodeJson {
implicit def decodeOption[E](implicit decodeJson: DecodeJson[E]): DecodeJson[Option[E]] = (node: JsonNode) => {
if (node.isNull) Right(None)
else decodeJson.decodeEither(node).right.map(Some(_))
else decodeJson.decodeEither(node).map(Some(_))
}
implicit def decodeSeq[E, S[+T] <: Seq[E]](implicit decodeJson: DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = (node: JsonNode) => {
@ -93,7 +93,7 @@ object DecodeJson {
implicit def decodeMap[V, M[K, +V] <: Map[K, V]](implicit decodeJson: DecodeJson[V], factory: Factory[(String, V), M[String, V]]): DecodeJson[M[String, V]] = (node: JsonNode) => {
if (node.isObject)
decodeIterator(node.fields.asScala)(e => decodeJson.decodeEither(e.getValue).right.map(v => (e.getKey, v)))
decodeIterator(node.fields.asScala)(e => decodeJson.decodeEither(e.getValue).map(v => (e.getKey, v)))
else Left(s"Expected JSON object, received $node")
}

View File

@ -17,8 +17,8 @@
package kafka.utils.json
import scala.collection.{Iterator, JavaConverters}
import JavaConverters._
import scala.collection.Iterator
import scala.jdk.CollectionConverters._
import com.fasterxml.jackson.databind.node.ArrayNode

View File

@ -19,7 +19,7 @@ package kafka.utils.json
import com.fasterxml.jackson.databind.JsonMappingException
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import com.fasterxml.jackson.databind.node.ObjectNode

View File

@ -93,7 +93,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
// create the partition assignment
writeTopicPartitionAssignment(topic, partitionReplicaAssignment.mapValues(ReplicaAssignment(_)).toMap, isUpdate = false)
writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) },
isUpdate = false)
}
/**
@ -140,7 +141,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap
if (!isUpdate) {
zkClient.createTopicAssignment(topic, assignment.mapValues(_.replicas).toMap)
zkClient.createTopicAssignment(topic, assignment.map { case (k, v) => k -> v.replicas })
} else {
zkClient.setTopicAssignment(topic, assignment)
}
@ -218,7 +219,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
writeTopicPartitionAssignment(topic, proposedAssignment, isUpdate = true)
}
proposedAssignment.mapValues(_.replicas).toMap
proposedAssignment.map { case (k, v) => k -> v.replicas }
}
private def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],

Some files were not shown because too many files have changed in this diff Show More