mirror of https://github.com/apache/kafka.git
MINOR: Fix Scala 2.13 compiler warnings (#8390)
Once Scala 2.13.2 is officially released, I will submit a follow up PR that enables `-Xfatal-warnings` with the necessary warning exclusions. Compiler warning exclusions were only introduced in 2.13.2 and hence why we have to wait for that. I used a snapshot build to test it in the meantime. Changes: * Remove Deprecated annotation from internal request classes * Class.newInstance is deprecated in favor of Class.getConstructor().newInstance * Replace deprecated JavaConversions with CollectionConverters * Remove unused kafka.cluster.Cluster * Don't use Map and Set methods deprecated in 2.13: - collection.Map +, ++, -, --, mapValues, filterKeys, retain - collection.Set +, ++, -, -- * Add scala-collection-compat dependency to streams-scala and update version to 2.1.4. * Replace usages of deprecated Either.get and Either.right * Replace usage of deprecated Integer(String) constructor * `import scala.language.implicitConversions` is not needed in Scala 2.13 * Replace usage of deprecated `toIterator`, `Traversable`, `seq`, `reverseMap`, `hasDefiniteSize` * Replace usage of deprecated alterConfigs with incrementalAlterConfigs where possible * Fix implicit widening conversions from Long/Int to Double/Float * Avoid implicit conversions to String * Eliminate usage of deprecated procedure syntax * Remove `println`in `LogValidatorTest` instead of fixing the compiler warning since tests should not `println`. * Eliminate implicit conversion from Array to Seq * Remove unnecessary usage of 3 argument assertEquals * Replace `toStream` with `iterator` * Do not use deprecated SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS * Replace StringBuilder.newBuilder with new StringBuilder * Rename AclBuffers to AclSeqs and remove usage of `filterKeys` * More consistent usage of Set/Map in Controller classes: this also fixes deprecated warnings with Scala 2.13 * Add spotBugs exclusion for inliner artifact in KafkaApis with Scala 2.12. Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
cb9125106d
commit
90bbeedf52
|
@ -1302,6 +1302,7 @@ project(':streams:streams-scala') {
|
|||
compile project(':streams')
|
||||
|
||||
compile libs.scalaLibrary
|
||||
compile libs.scalaCollectionCompat
|
||||
|
||||
testCompile project(':core')
|
||||
testCompile project(':core').sourceSets.test.output
|
||||
|
|
|
@ -186,7 +186,6 @@ public class ListOffsetRequest extends AbstractRequest {
|
|||
|
||||
public static final class PartitionData {
|
||||
public final long timestamp;
|
||||
@Deprecated
|
||||
public final int maxNumOffsets; // only supported in v0
|
||||
public final Optional<Integer> currentLeaderEpoch;
|
||||
|
||||
|
@ -196,7 +195,7 @@ public class ListOffsetRequest extends AbstractRequest {
|
|||
this.currentLeaderEpoch = currentLeaderEpoch;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
// For V0
|
||||
public PartitionData(long timestamp, int maxNumOffsets) {
|
||||
this(timestamp, maxNumOffsets, Optional.empty());
|
||||
}
|
||||
|
|
|
@ -137,7 +137,6 @@ public class ListOffsetResponse extends AbstractResponse {
|
|||
public static final class PartitionData {
|
||||
public final Errors error;
|
||||
// The offsets list is only used in ListOffsetResponse v0.
|
||||
@Deprecated
|
||||
public final List<Long> offsets;
|
||||
public final Long timestamp;
|
||||
public final Long offset;
|
||||
|
@ -146,7 +145,6 @@ public class ListOffsetResponse extends AbstractResponse {
|
|||
/**
|
||||
* Constructor for ListOffsetResponse v0
|
||||
*/
|
||||
@Deprecated
|
||||
public PartitionData(Errors error, List<Long> offsets) {
|
||||
this.error = error;
|
||||
this.offsets = offsets;
|
||||
|
|
|
@ -39,9 +39,7 @@ public class OffsetCommitRequest extends AbstractRequest {
|
|||
public static final String DEFAULT_MEMBER_ID = "";
|
||||
public static final long DEFAULT_RETENTION_TIME = -1L;
|
||||
|
||||
// default values for old versions,
|
||||
// will be removed after these versions are deprecated
|
||||
@Deprecated
|
||||
// default values for old versions, will be removed after these versions are no longer supported
|
||||
public static final long DEFAULT_TIMESTAMP = -1L; // for V0, V1
|
||||
|
||||
private final OffsetCommitRequestData data;
|
||||
|
|
|
@ -82,7 +82,8 @@ public class SecurityUtils {
|
|||
try {
|
||||
String[] securityProviderClasses = securityProviderClassesStr.replaceAll("\\s+", "").split(",");
|
||||
for (int index = 0; index < securityProviderClasses.length; index++) {
|
||||
SecurityProviderCreator securityProviderCreator = (SecurityProviderCreator) Class.forName(securityProviderClasses[index]).newInstance();
|
||||
SecurityProviderCreator securityProviderCreator =
|
||||
(SecurityProviderCreator) Class.forName(securityProviderClasses[index]).getConstructor().newInstance();
|
||||
securityProviderCreator.configure(configs);
|
||||
Security.insertProviderAt(securityProviderCreator.getProvider(), index + 1);
|
||||
}
|
||||
|
@ -91,7 +92,7 @@ public class SecurityUtils {
|
|||
" are expected to be sub-classes of SecurityProviderCreator");
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
LOGGER.error("Unrecognized security provider creator class", cnfe);
|
||||
} catch (IllegalAccessException | InstantiationException e) {
|
||||
} catch (ReflectiveOperationException e) {
|
||||
LOGGER.error("Unexpected implementation of security provider creator class", e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -648,7 +648,7 @@ public final class MessageTest {
|
|||
message.write(byteBufferAccessor, cache, version);
|
||||
assertEquals("The result of the size function does not match the number of bytes " +
|
||||
"written for version " + version, size, buf.position());
|
||||
Message message2 = message.getClass().newInstance();
|
||||
Message message2 = message.getClass().getConstructor().newInstance();
|
||||
buf.flip();
|
||||
message2.read(byteBufferAccessor, version);
|
||||
assertEquals("The result of the size function does not match the number of bytes " +
|
||||
|
@ -661,7 +661,7 @@ public final class MessageTest {
|
|||
|
||||
private void testStructRoundTrip(short version, Message message, Message expected) throws Exception {
|
||||
Struct struct = message.toStruct(version);
|
||||
Message message2 = message.getClass().newInstance();
|
||||
Message message2 = message.getClass().getConstructor().newInstance();
|
||||
message2.fromStruct(struct, version);
|
||||
assertEquals(expected, message2);
|
||||
assertEquals(expected.hashCode(), message2.hashCode());
|
||||
|
|
|
@ -331,7 +331,7 @@ public class ConnectorConfig extends AbstractConfig {
|
|||
}
|
||||
Transformation transformation;
|
||||
try {
|
||||
transformation = transformationCls.asSubclass(Transformation.class).newInstance();
|
||||
transformation = transformationCls.asSubclass(Transformation.class).getConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
|
||||
}
|
||||
|
|
|
@ -524,8 +524,8 @@ public class AbstractHerderTest {
|
|||
EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
|
||||
final Connector connector;
|
||||
try {
|
||||
connector = connectorClass.newInstance();
|
||||
} catch (InstantiationException | IllegalAccessException e) {
|
||||
connector = connectorClass.getConstructor().newInstance();
|
||||
} catch (ReflectiveOperationException e) {
|
||||
throw new RuntimeException("Couldn't create connector", e);
|
||||
}
|
||||
EasyMock.expect(plugins.newConnector(connectorClass.getName())).andReturn(connector);
|
||||
|
|
|
@ -479,7 +479,7 @@ public class ConnectorPluginsResourceTest {
|
|||
public MockConnectorPluginDesc(Class<? extends Connector> klass) throws Exception {
|
||||
super(
|
||||
klass,
|
||||
klass.newInstance().version(),
|
||||
klass.getConstructor().newInstance().version(),
|
||||
new MockPluginClassLoader(null, new URL[0])
|
||||
);
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import kafka.server.{KafkaServer, KafkaServerStartable}
|
|||
import kafka.utils.{CommandLineUtils, Exit, Logging}
|
||||
import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Utils}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
object Kafka extends Logging {
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
|
|||
import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
|
||||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.compat.java8.OptionConverters._
|
||||
import scala.collection.mutable
|
||||
import scala.io.StdIn
|
||||
|
@ -147,8 +147,9 @@ object AclCommand extends Logging {
|
|||
} else {
|
||||
listPrincipals.foreach(principal => {
|
||||
println(s"ACLs for principal `$principal`")
|
||||
val filteredResourceToAcls = resourceToAcls.mapValues(acls =>
|
||||
acls.filter(acl => principal.toString.equals(acl.principal))).filter(entry => entry._2.nonEmpty)
|
||||
val filteredResourceToAcls = resourceToAcls.map { case (resource, acls) =>
|
||||
resource -> acls.filter(acl => principal.toString.equals(acl.principal))
|
||||
}.filter { case (_, acls) => acls.nonEmpty }
|
||||
|
||||
for ((resource, acls) <- filteredResourceToAcls)
|
||||
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
|
||||
|
@ -263,8 +264,9 @@ object AclCommand extends Logging {
|
|||
} else {
|
||||
listPrincipals.foreach(principal => {
|
||||
println(s"ACLs for principal `$principal`")
|
||||
val filteredResourceToAcls = resourceToAcls.mapValues(acls =>
|
||||
acls.filter(acl => principal.toString.equals(acl.principal))).filter(entry => entry._2.nonEmpty)
|
||||
val filteredResourceToAcls = resourceToAcls.map { case (resource, acls) =>
|
||||
resource -> acls.filter(acl => principal.toString.equals(acl.principal))
|
||||
}.filter { case (_, acls) => acls.nonEmpty }
|
||||
|
||||
for ((resource, acls) <- filteredResourceToAcls)
|
||||
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
|
||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.kafka.common.Node
|
|||
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection
|
||||
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
||||
/**
|
||||
|
|
|
@ -39,10 +39,9 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
|
|||
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
|
||||
import org.apache.zookeeper.client.ZKClientConfig
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection._
|
||||
|
||||
|
||||
/**
|
||||
* This script can be used to change configs for topics/clients/users/brokers dynamically
|
||||
* An entity described or altered by the command may be one of:
|
||||
|
|
|
@ -31,9 +31,9 @@ import org.apache.kafka.clients.CommonClientConfigs
|
|||
import org.apache.kafka.common.utils.Utils
|
||||
import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable.ListBuffer
|
||||
import scala.collection.{Map, Seq, Set, immutable, mutable}
|
||||
import scala.collection.{Map, Seq, immutable, mutable}
|
||||
import scala.util.{Failure, Success, Try}
|
||||
import joptsimple.OptionSpec
|
||||
import org.apache.kafka.common.protocol.Errors
|
||||
|
@ -403,7 +403,7 @@ object ConsumerGroupCommand extends Logging {
|
|||
}
|
||||
|
||||
def deleteOffsets(groupId: String, topics: List[String]): (Errors, Map[TopicPartition, Throwable]) = {
|
||||
var partitionLevelResult: Map[TopicPartition, Throwable] = mutable.HashMap()
|
||||
val partitionLevelResult = mutable.Map[TopicPartition, Throwable]()
|
||||
|
||||
val (topicWithPartitions, topicWithoutPartitions) = topics.partition(_.contains(":"))
|
||||
|
||||
|
@ -527,16 +527,17 @@ object ConsumerGroupCommand extends Logging {
|
|||
partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
|
||||
Some(s"${consumerSummary.clientId}"))
|
||||
}
|
||||
val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
|
||||
case (topicPartition, offset) =>
|
||||
collectConsumerAssignment(
|
||||
groupId,
|
||||
Option(consumerGroup.coordinator),
|
||||
Seq(topicPartition),
|
||||
Map(topicPartition -> Some(offset.offset)),
|
||||
Some(MISSING_COLUMN_VALUE),
|
||||
Some(MISSING_COLUMN_VALUE),
|
||||
Some(MISSING_COLUMN_VALUE)).toSeq
|
||||
val rowsWithoutConsumer = committedOffsets.filter { case (tp, _) =>
|
||||
!assignedTopicPartitions.contains(tp)
|
||||
}.flatMap { case (topicPartition, offset) =>
|
||||
collectConsumerAssignment(
|
||||
groupId,
|
||||
Option(consumerGroup.coordinator),
|
||||
Seq(topicPartition),
|
||||
Map(topicPartition -> Some(offset.offset)),
|
||||
Some(MISSING_COLUMN_VALUE),
|
||||
Some(MISSING_COLUMN_VALUE),
|
||||
Some(MISSING_COLUMN_VALUE)).toSeq
|
||||
}
|
||||
groupId -> (Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))
|
||||
}).toMap
|
||||
|
@ -862,10 +863,10 @@ object ConsumerGroupCommand extends Logging {
|
|||
withTimeoutMs(new DeleteConsumerGroupsOptions)
|
||||
).deletedGroups().asScala
|
||||
|
||||
val result = groupsToDelete.mapValues { f =>
|
||||
val result = groupsToDelete.map { case (g, f) =>
|
||||
Try(f.get) match {
|
||||
case _: Success[_] => null
|
||||
case Failure(e) => e
|
||||
case Success(_) => g -> null
|
||||
case Failure(e) => g -> e
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1009,11 +1010,11 @@ object ConsumerGroupCommand extends Logging {
|
|||
|
||||
options = parser.parse(args : _*)
|
||||
|
||||
val allGroupSelectionScopeOpts: Set[OptionSpec[_]] = Set(groupOpt, allGroupsOpt)
|
||||
val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
|
||||
val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt,
|
||||
val allGroupSelectionScopeOpts = immutable.Set[OptionSpec[_]](groupOpt, allGroupsOpt)
|
||||
val allConsumerGroupLevelOpts = immutable.Set[OptionSpec[_]](listOpt, describeOpt, deleteOpt, resetOffsetsOpt)
|
||||
val allResetOffsetScenarioOpts = immutable.Set[OptionSpec[_]](resetToOffsetOpt, resetShiftByOpt,
|
||||
resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt)
|
||||
val allDeleteOffsetsOpts: Set[OptionSpec[_]] = Set(groupOpt, topicOpt)
|
||||
val allDeleteOffsetsOpts = immutable.Set[OptionSpec[_]](groupOpt, topicOpt)
|
||||
|
||||
def checkArgs(): Unit = {
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
|
|||
import org.apache.kafka.common.security.token.delegation.DelegationToken
|
||||
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.Set
|
||||
|
||||
/**
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.kafka.clients.CommonClientConfigs
|
|||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.Seq
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException
|
|||
import org.apache.kafka.common.errors.ElectionNotNeededException
|
||||
import org.apache.kafka.common.errors.TimeoutException
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable
|
||||
import scala.concurrent.duration._
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirs
|
|||
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.Map
|
||||
|
||||
/**
|
||||
|
@ -48,7 +48,7 @@ object LogDirsCommand {
|
|||
|
||||
out.println("Querying brokers for log directories information")
|
||||
val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
|
||||
val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.mapValues(_.asScala).toMap
|
||||
val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.map { case (k, v) => k -> v.asScala }
|
||||
|
||||
out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
|
||||
out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
package kafka.admin
|
||||
|
||||
import collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import collection._
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.ExecutionException
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.kafka.common.security.JaasUtils
|
|||
import org.apache.kafka.common.utils.{Time, Utils}
|
||||
import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition, TopicPartitionReplica}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.{Map, Seq, mutable}
|
||||
import scala.compat.java8.OptionConverters._
|
||||
import scala.math.Ordered.orderingToOrdered
|
||||
|
@ -1062,17 +1062,16 @@ object ReassignPartitionsCommand extends Logging {
|
|||
def curReassignmentsToString(adminClient: Admin): String = {
|
||||
val currentReassignments = adminClient.
|
||||
listPartitionReassignments().reassignments().get().asScala
|
||||
val text = currentReassignments.keySet.toBuffer.sortWith(compareTopicPartitions).map {
|
||||
case part =>
|
||||
val reassignment = currentReassignments(part)
|
||||
val replicas = reassignment.replicas().asScala
|
||||
val addingReplicas = reassignment.addingReplicas().asScala
|
||||
val removingReplicas = reassignment.removingReplicas().asScala
|
||||
"%s: replicas: %s.%s%s".format(part, replicas.mkString(","),
|
||||
if (addingReplicas.isEmpty) "" else
|
||||
" adding: %s.".format(addingReplicas.mkString(",")),
|
||||
if (removingReplicas.isEmpty) "" else
|
||||
" removing: %s.".format(removingReplicas.mkString(",")))
|
||||
val text = currentReassignments.keySet.toBuffer.sortWith(compareTopicPartitions).map { part =>
|
||||
val reassignment = currentReassignments(part)
|
||||
val replicas = reassignment.replicas().asScala
|
||||
val addingReplicas = reassignment.addingReplicas().asScala
|
||||
val removingReplicas = reassignment.removingReplicas().asScala
|
||||
"%s: replicas: %s.%s%s".format(part, replicas.mkString(","),
|
||||
if (addingReplicas.isEmpty) "" else
|
||||
" adding: %s.".format(addingReplicas.mkString(",")),
|
||||
if (removingReplicas.isEmpty) "" else
|
||||
" removing: %s.".format(removingReplicas.mkString(",")))
|
||||
}.mkString(System.lineSeparator())
|
||||
if (text.isEmpty) {
|
||||
"No partition reassignments found."
|
||||
|
@ -1155,7 +1154,7 @@ object ReassignPartitionsCommand extends Logging {
|
|||
def currentPartitionReplicaAssignmentToString(proposedParts: Map[TopicPartition, Seq[Int]],
|
||||
currentParts: Map[TopicPartition, Seq[Int]]): String = {
|
||||
"Current partition replica assignment%n%n%s%n%nSave this to use as the %s".
|
||||
format(formatAsReassignmentJson(currentParts.filterKeys(proposedParts.contains(_)).toMap, Map.empty),
|
||||
format(formatAsReassignmentJson(currentParts.filter { case (k, _) => proposedParts.contains(k) }.toMap, Map.empty),
|
||||
"--reassignment-json-file option during rollback")
|
||||
}
|
||||
|
||||
|
@ -1192,13 +1191,10 @@ object ReassignPartitionsCommand extends Logging {
|
|||
* @return A map from partition objects to error strings.
|
||||
*/
|
||||
def alterPartitionReassignments(adminClient: Admin,
|
||||
reassignments: Map[TopicPartition, Seq[Int]])
|
||||
: Map[TopicPartition, Throwable] = {
|
||||
reassignments: Map[TopicPartition, Seq[Int]]): Map[TopicPartition, Throwable] = {
|
||||
val results: Map[TopicPartition, KafkaFuture[Void]] =
|
||||
adminClient.alterPartitionReassignments(reassignments.map {
|
||||
case (part, replicas) => {
|
||||
(part, Optional.of(new NewPartitionReassignment(replicas.map(Integer.valueOf(_)).asJava)))
|
||||
}
|
||||
adminClient.alterPartitionReassignments(reassignments.map { case (part, replicas) =>
|
||||
(part, Optional.of(new NewPartitionReassignment(replicas.map(Integer.valueOf(_)).asJava)))
|
||||
}.asJava).values().asScala
|
||||
results.flatMap {
|
||||
case (part, future) => {
|
||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.kafka.common.security.JaasUtils
|
|||
import org.apache.kafka.common.utils.{Time, Utils}
|
||||
import org.apache.zookeeper.KeeperException.NodeExistsException
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection._
|
||||
import scala.compat.java8.OptionConverters._
|
||||
import scala.concurrent.ExecutionException
|
||||
|
@ -653,7 +653,7 @@ object TopicCommand extends Logging {
|
|||
|
||||
options = parser.parse(args : _*)
|
||||
|
||||
private val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
|
||||
private val allTopicLevelOpts = immutable.Set[OptionSpec[_]](alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
|
||||
|
||||
private val allReplicationReportOpts: Set[OptionSpec[_]] = Set(reportUnderReplicatedPartitionsOpt, reportUnderMinIsrPartitionsOpt, reportAtMinIsrPartitionsOpt, reportUnavailablePartitionsOpt)
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.zookeeper.client.ZKClientConfig
|
|||
import org.apache.zookeeper.data.Stat
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable.Queue
|
||||
import scala.concurrent._
|
||||
import scala.concurrent.duration._
|
||||
|
|
|
@ -19,7 +19,7 @@ package kafka
|
|||
import org.apache.kafka.common.ElectionType
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.requests.ElectLeadersRequest
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
package object api {
|
||||
implicit final class ElectLeadersRequestOps(val self: ElectLeadersRequest) extends AnyVal {
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
|
|||
import org.apache.kafka.server.authorizer.AuthorizerServerInfo
|
||||
|
||||
import scala.collection.Seq
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
object Broker {
|
||||
private[cluster] case class ServerInfo(clusterResource: ClusterResource,
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.cluster
|
||||
|
||||
import scala.collection._
|
||||
|
||||
/**
|
||||
* The set of active brokers in the cluster
|
||||
*/
|
||||
private[kafka] class Cluster {
|
||||
|
||||
private val brokers = new mutable.HashMap[Int, Broker]
|
||||
|
||||
def this(brokerList: Iterable[Broker]) {
|
||||
this()
|
||||
for(broker <- brokerList)
|
||||
brokers.put(broker.id, broker)
|
||||
}
|
||||
|
||||
def getBroker(id: Int): Option[Broker] = brokers.get(id)
|
||||
|
||||
def add(broker: Broker) = brokers.put(broker.id, broker)
|
||||
|
||||
def remove(id: Int) = brokers.remove(id)
|
||||
|
||||
def size = brokers.size
|
||||
|
||||
override def toString: String =
|
||||
"Cluster(" + brokers.values.mkString(", ") + ")"
|
||||
}
|
|
@ -40,7 +40,7 @@ import org.apache.kafka.common.requests.EpochEndOffset._
|
|||
import org.apache.kafka.common.requests._
|
||||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.{Map, Seq}
|
||||
|
||||
trait PartitionStateStore {
|
||||
|
@ -662,13 +662,11 @@ class Partition(val topicPartition: TopicPartition,
|
|||
isr: Set[Int],
|
||||
addingReplicas: Seq[Int],
|
||||
removingReplicas: Seq[Int]): Unit = {
|
||||
val replicaSet = assignment.toSet
|
||||
val removedReplicas = remoteReplicasMap.keys -- replicaSet
|
||||
|
||||
remoteReplicasMap.clear()
|
||||
assignment
|
||||
.filter(_ != localBrokerId)
|
||||
.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))
|
||||
removedReplicas.foreach(remoteReplicasMap.remove)
|
||||
|
||||
if (addingReplicas.nonEmpty || removingReplicas.nonEmpty)
|
||||
assignmentState = OngoingReassignmentState(addingReplicas, removingReplicas, assignment)
|
||||
else
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.kafka.common.internals.FatalExitError
|
|||
import org.apache.kafka.common.requests.AbstractRequest
|
||||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
/**
|
||||
* Class for inter-broker send thread that utilize a non-blocking network client.
|
||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
|
|||
import org.apache.kafka.common.utils.{LogContext, Time}
|
||||
import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicPartition}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable.{HashMap, ListBuffer}
|
||||
import scala.collection.{Seq, Set, mutable}
|
||||
|
||||
|
@ -445,8 +445,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
|
|||
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
|
||||
else 0
|
||||
|
||||
leaderAndIsrRequestMap.filterKeys(controllerContext.liveOrShuttingDownBrokerIds.contains).foreach {
|
||||
case (broker, leaderAndIsrPartitionStates) =>
|
||||
leaderAndIsrRequestMap.foreach { case (broker, leaderAndIsrPartitionStates) =>
|
||||
if (controllerContext.liveOrShuttingDownBrokerIds.contains(broker)) {
|
||||
val numBecomeLeaders = leaderAndIsrPartitionStates.count { case (topicPartition, state) =>
|
||||
val isBecomeLeader = broker == state.leader
|
||||
val typeOfRequest =
|
||||
|
@ -470,8 +470,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
|
|||
val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
|
||||
sendEvent(LeaderAndIsrResponseReceived(leaderAndIsrResponse, broker))
|
||||
})
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
leaderAndIsrRequestMap.clear()
|
||||
}
|
||||
|
@ -553,25 +552,27 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
|
|||
}
|
||||
|
||||
val traceEnabled = stateChangeLog.isTraceEnabled
|
||||
stopReplicaRequestMap.filterKeys(controllerContext.liveOrShuttingDownBrokerIds.contains).foreach { case (brokerId, replicaInfoList) =>
|
||||
val (stopReplicaWithDelete, stopReplicaWithoutDelete) = replicaInfoList.partition(r => r.deletePartition)
|
||||
val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(brokerId)
|
||||
stopReplicaRequestMap.foreach { case (brokerId, replicaInfoList) =>
|
||||
if (controllerContext.liveOrShuttingDownBrokerIds.contains(brokerId)) {
|
||||
val (stopReplicaWithDelete, stopReplicaWithoutDelete) = replicaInfoList.partition(r => r.deletePartition)
|
||||
val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(brokerId)
|
||||
|
||||
if (stopReplicaWithDelete.nonEmpty) {
|
||||
stateChangeLog.info(s"Sending a stop replica request (delete = true) for ${stopReplicaWithDelete.size} replicas to broker $brokerId")
|
||||
if (traceEnabled)
|
||||
stateChangeLog.trace(s"The stop replica request (delete = true) sent to broker $brokerId contains ${stopReplicaWithDelete.map(_.replica).mkString(",")}")
|
||||
val stopReplicaRequest = createStopReplicaRequest(brokerEpoch, stopReplicaWithDelete, deletePartitions = true)
|
||||
val callback = stopReplicaPartitionDeleteResponseCallback(brokerId) _
|
||||
sendRequest(brokerId, stopReplicaRequest, callback)
|
||||
}
|
||||
if (stopReplicaWithDelete.nonEmpty) {
|
||||
stateChangeLog.info(s"Sending a stop replica request (delete = true) for ${stopReplicaWithDelete.size} replicas to broker $brokerId")
|
||||
if (traceEnabled)
|
||||
stateChangeLog.trace(s"The stop replica request (delete = true) sent to broker $brokerId contains ${stopReplicaWithDelete.map(_.replica).mkString(",")}")
|
||||
val stopReplicaRequest = createStopReplicaRequest(brokerEpoch, stopReplicaWithDelete, deletePartitions = true)
|
||||
val callback = stopReplicaPartitionDeleteResponseCallback(brokerId) _
|
||||
sendRequest(brokerId, stopReplicaRequest, callback)
|
||||
}
|
||||
|
||||
if (stopReplicaWithoutDelete.nonEmpty) {
|
||||
stateChangeLog.info(s"Sending a stop replica request (delete = false) for ${stopReplicaWithoutDelete.size} replicas to broker $brokerId")
|
||||
if (traceEnabled)
|
||||
stateChangeLog.trace(s"The stop replica request (delete = false) sent to broker $brokerId contains ${stopReplicaWithoutDelete.map(_.replica).mkString(",")}")
|
||||
val stopReplicaRequest = createStopReplicaRequest(brokerEpoch, stopReplicaWithoutDelete, deletePartitions = false)
|
||||
sendRequest(brokerId, stopReplicaRequest)
|
||||
if (stopReplicaWithoutDelete.nonEmpty) {
|
||||
stateChangeLog.info(s"Sending a stop replica request (delete = false) for ${stopReplicaWithoutDelete.size} replicas to broker $brokerId")
|
||||
if (traceEnabled)
|
||||
stateChangeLog.trace(s"The stop replica request (delete = false) sent to broker $brokerId contains ${stopReplicaWithoutDelete.map(_.replica).mkString(",")}")
|
||||
val stopReplicaRequest = createStopReplicaRequest(brokerEpoch, stopReplicaWithoutDelete, deletePartitions = false)
|
||||
sendRequest(brokerId, stopReplicaRequest)
|
||||
}
|
||||
}
|
||||
}
|
||||
stopReplicaRequestMap.clear()
|
||||
|
|
|
@ -74,19 +74,19 @@ case class ReplicaAssignment private (replicas: Seq[Int],
|
|||
class ControllerContext {
|
||||
val stats = new ControllerStats
|
||||
var offlinePartitionCount = 0
|
||||
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
|
||||
private var liveBrokers: Set[Broker] = Set.empty
|
||||
private var liveBrokerEpochs: Map[Int, Long] = Map.empty
|
||||
val shuttingDownBrokerIds = mutable.Set.empty[Int]
|
||||
private val liveBrokers = mutable.Set.empty[Broker]
|
||||
private val liveBrokerEpochs = mutable.Map.empty[Int, Long]
|
||||
var epoch: Int = KafkaController.InitialControllerEpoch
|
||||
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
|
||||
|
||||
var allTopics: Set[String] = Set.empty
|
||||
val allTopics = mutable.Set.empty[String]
|
||||
val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, ReplicaAssignment]]
|
||||
val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
|
||||
val partitionsBeingReassigned = mutable.Set.empty[TopicPartition]
|
||||
val partitionStates = mutable.Map.empty[TopicPartition, PartitionState]
|
||||
val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]
|
||||
val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty
|
||||
val replicasOnOfflineDirs = mutable.Map.empty[Int, Set[TopicPartition]]
|
||||
|
||||
val topicsToBeDeleted = mutable.Set.empty[String]
|
||||
|
||||
|
@ -113,7 +113,7 @@ class ControllerContext {
|
|||
val topicsIneligibleForDeletion = mutable.Set.empty[String]
|
||||
|
||||
private def clearTopicsState(): Unit = {
|
||||
allTopics = Set.empty
|
||||
allTopics.clear()
|
||||
partitionAssignments.clear()
|
||||
partitionLeadershipInfo.clear()
|
||||
partitionsBeingReassigned.clear()
|
||||
|
@ -124,11 +124,10 @@ class ControllerContext {
|
|||
}
|
||||
|
||||
def partitionReplicaAssignment(topicPartition: TopicPartition): Seq[Int] = {
|
||||
partitionAssignments.getOrElse(topicPartition.topic, mutable.Map.empty)
|
||||
.get(topicPartition.partition) match {
|
||||
case Some(partitionAssignment) => partitionAssignment.replicas
|
||||
case None => Seq.empty
|
||||
}
|
||||
partitionAssignments.getOrElse(topicPartition.topic, mutable.Map.empty).get(topicPartition.partition) match {
|
||||
case Some(partitionAssignment) => partitionAssignment.replicas
|
||||
case None => Seq.empty
|
||||
}
|
||||
}
|
||||
|
||||
def partitionFullReplicaAssignment(topicPartition: TopicPartition): ReplicaAssignment = {
|
||||
|
@ -161,21 +160,24 @@ class ControllerContext {
|
|||
}.toSet
|
||||
}
|
||||
|
||||
def setLiveBrokerAndEpochs(brokerAndEpochs: Map[Broker, Long]): Unit = {
|
||||
liveBrokers = brokerAndEpochs.keySet
|
||||
liveBrokerEpochs =
|
||||
brokerAndEpochs map { case (broker, brokerEpoch) => (broker.id, brokerEpoch)}
|
||||
def setLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {
|
||||
clearLiveBrokers()
|
||||
addLiveBrokers(brokerAndEpochs)
|
||||
}
|
||||
|
||||
def addLiveBrokersAndEpochs(brokerAndEpochs: Map[Broker, Long]): Unit = {
|
||||
liveBrokers = liveBrokers ++ brokerAndEpochs.keySet
|
||||
liveBrokerEpochs = liveBrokerEpochs ++
|
||||
(brokerAndEpochs map { case (broker, brokerEpoch) => (broker.id, brokerEpoch)})
|
||||
private def clearLiveBrokers(): Unit = {
|
||||
liveBrokers.clear()
|
||||
liveBrokerEpochs.clear()
|
||||
}
|
||||
|
||||
def addLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {
|
||||
liveBrokers ++= brokerAndEpochs.keySet
|
||||
liveBrokerEpochs ++= brokerAndEpochs.map { case (broker, brokerEpoch) => (broker.id, brokerEpoch) }
|
||||
}
|
||||
|
||||
def removeLiveBrokers(brokerIds: Set[Int]): Unit = {
|
||||
liveBrokers = liveBrokers.filter(broker => !brokerIds.contains(broker.id))
|
||||
liveBrokerEpochs = liveBrokerEpochs.filter { case (id, _) => !brokerIds.contains(id) }
|
||||
liveBrokers --= liveBrokers.filter(broker => brokerIds.contains(broker.id))
|
||||
liveBrokerEpochs --= brokerIds
|
||||
}
|
||||
|
||||
def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {
|
||||
|
@ -184,7 +186,7 @@ class ControllerContext {
|
|||
}
|
||||
|
||||
// getter
|
||||
def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet -- shuttingDownBrokerIds
|
||||
def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet.diff(shuttingDownBrokerIds)
|
||||
def liveOrShuttingDownBrokerIds: Set[Int] = liveBrokerEpochs.keySet
|
||||
def liveOrShuttingDownBrokers: Set[Broker] = liveBrokers
|
||||
def liveBrokerIdAndEpochs: Map[Int, Long] = liveBrokerEpochs
|
||||
|
@ -270,15 +272,20 @@ class ControllerContext {
|
|||
epoch = 0
|
||||
epochZkVersion = 0
|
||||
clearTopicsState()
|
||||
setLiveBrokerAndEpochs(Map.empty)
|
||||
clearLiveBrokers()
|
||||
}
|
||||
|
||||
def setAllTopics(topics: Set[String]): Unit = {
|
||||
allTopics.clear()
|
||||
allTopics ++= topics
|
||||
}
|
||||
|
||||
def removeTopic(topic: String): Unit = {
|
||||
allTopics -= topic
|
||||
partitionAssignments.remove(topic)
|
||||
partitionLeadershipInfo.foreach {
|
||||
case (topicPartition, _) if topicPartition.topic == topic => partitionLeadershipInfo.remove(topicPartition)
|
||||
case _ =>
|
||||
partitionLeadershipInfo.foreach { case (topicPartition, _) =>
|
||||
if (topicPartition.topic == topic)
|
||||
partitionLeadershipInfo.remove(topicPartition)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import kafka.utils.ShutdownableThread
|
|||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection._
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
object ControllerEventManager {
|
||||
val ControllerEventThreadName = "controller-event-thread"
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.kafka.common.utils.Time
|
|||
import org.apache.zookeeper.KeeperException
|
||||
import org.apache.zookeeper.KeeperException.Code
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.{Map, Seq, Set, immutable, mutable}
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.util.{Failure, Try}
|
||||
|
@ -348,7 +348,7 @@ class KafkaController(val config: KafkaConfig,
|
|||
info(s"New broker startup callback for ${newBrokers.mkString(",")}")
|
||||
newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
|
||||
val newBrokersSet = newBrokers.toSet
|
||||
val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds -- newBrokers
|
||||
val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds.diff(newBrokersSet)
|
||||
// Send update metadata request to all the existing brokers in the cluster so that they know about the new brokers
|
||||
// via this update. No need to include any partition states in the request since there are no partition state changes.
|
||||
sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
|
||||
|
@ -725,9 +725,9 @@ class KafkaController(val config: KafkaConfig,
|
|||
private def initializeControllerContext(): Unit = {
|
||||
// update controller cache with delete topic information
|
||||
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
|
||||
controllerContext.setLiveBrokerAndEpochs(curBrokerAndEpochs)
|
||||
controllerContext.setLiveBrokers(curBrokerAndEpochs)
|
||||
info(s"Initialized broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
|
||||
controllerContext.allTopics = zkClient.getAllTopicsInCluster(true)
|
||||
controllerContext.setAllTopics(zkClient.getAllTopicsInCluster(true))
|
||||
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
|
||||
zkClient.getFullReplicaAssignmentForTopics(controllerContext.allTopics.toSet).foreach {
|
||||
case (topicPartition, replicaAssignment) =>
|
||||
|
@ -736,7 +736,7 @@ class KafkaController(val config: KafkaConfig,
|
|||
controllerContext.partitionsBeingReassigned.add(topicPartition)
|
||||
}
|
||||
controllerContext.partitionLeadershipInfo.clear()
|
||||
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
|
||||
controllerContext.shuttingDownBrokerIds.clear()
|
||||
// register broker modifications handlers
|
||||
registerBrokerModificationsHandler(controllerContext.liveOrShuttingDownBrokerIds)
|
||||
// update the leader and isr cache for all existing partitions from Zookeeper
|
||||
|
@ -848,8 +848,9 @@ class KafkaController(val config: KafkaConfig,
|
|||
}
|
||||
|
||||
private def updateReplicaAssignmentForPartition(topicPartition: TopicPartition, assignment: ReplicaAssignment): Unit = {
|
||||
var topicAssignment = controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic)
|
||||
topicAssignment += topicPartition -> assignment
|
||||
val topicAssignment = mutable.Map() ++=
|
||||
controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic) +=
|
||||
(topicPartition -> assignment)
|
||||
|
||||
val setDataResponse = zkClient.setTopicAssignmentRaw(topicPartition.topic, topicAssignment, controllerContext.epochZkVersion)
|
||||
setDataResponse.resultCode match {
|
||||
|
@ -1194,9 +1195,9 @@ class KafkaController(val config: KafkaConfig,
|
|||
}
|
||||
|
||||
val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicPartition])
|
||||
val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
|
||||
val currentOfflineReplicas = mutable.Set() ++= previousOfflineReplicas --= onlineReplicas ++= offlineReplicas
|
||||
controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
|
||||
val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
|
||||
val newOfflineReplicas = currentOfflineReplicas.diff(previousOfflineReplicas)
|
||||
|
||||
if (newOfflineReplicas.nonEmpty) {
|
||||
stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
|
||||
|
@ -1221,7 +1222,7 @@ class KafkaController(val config: KafkaConfig,
|
|||
topicDeletionManager.failReplicaDeletion(replicasInError)
|
||||
if (replicasInError.size != partitionErrors.size) {
|
||||
// some replicas could have been successfully deleted
|
||||
val deletedReplicas = partitionErrors.keySet -- partitionsInError
|
||||
val deletedReplicas = partitionErrors.keySet.diff(partitionsInError)
|
||||
topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(PartitionAndReplica(_, replicaId)))
|
||||
}
|
||||
}
|
||||
|
@ -1364,8 +1365,8 @@ class KafkaController(val config: KafkaConfig,
|
|||
val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) }
|
||||
val curBrokerIds = curBrokerIdAndEpochs.keySet
|
||||
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
|
||||
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
|
||||
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
|
||||
val newBrokerIds = curBrokerIds.diff(liveOrShuttingDownBrokerIds)
|
||||
val deadBrokerIds = liveOrShuttingDownBrokerIds.diff(curBrokerIds)
|
||||
val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
|
||||
.filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
|
||||
val newBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => newBrokerIds.contains(broker.id) }
|
||||
|
@ -1384,13 +1385,13 @@ class KafkaController(val config: KafkaConfig,
|
|||
bouncedBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
|
||||
deadBrokerIds.foreach(controllerChannelManager.removeBroker)
|
||||
if (newBrokerIds.nonEmpty) {
|
||||
controllerContext.addLiveBrokersAndEpochs(newBrokerAndEpochs)
|
||||
controllerContext.addLiveBrokers(newBrokerAndEpochs)
|
||||
onBrokerStartup(newBrokerIdsSorted)
|
||||
}
|
||||
if (bouncedBrokerIds.nonEmpty) {
|
||||
controllerContext.removeLiveBrokers(bouncedBrokerIds)
|
||||
onBrokerFailure(bouncedBrokerIdsSorted)
|
||||
controllerContext.addLiveBrokersAndEpochs(bouncedBrokerAndEpochs)
|
||||
controllerContext.addLiveBrokers(bouncedBrokerAndEpochs)
|
||||
onBrokerStartup(bouncedBrokerIdsSorted)
|
||||
}
|
||||
if (deadBrokerIds.nonEmpty) {
|
||||
|
@ -1422,8 +1423,8 @@ class KafkaController(val config: KafkaConfig,
|
|||
if (!isActive) return
|
||||
val topics = zkClient.getAllTopicsInCluster(true)
|
||||
val newTopics = topics -- controllerContext.allTopics
|
||||
val deletedTopics = controllerContext.allTopics -- topics
|
||||
controllerContext.allTopics = topics
|
||||
val deletedTopics = controllerContext.allTopics.diff(topics)
|
||||
controllerContext.setAllTopics(topics)
|
||||
|
||||
registerPartitionModificationsHandlers(newTopics.toSeq)
|
||||
val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)
|
||||
|
|
|
@ -434,7 +434,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
|
|||
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
|
||||
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
|
||||
finishedUpdates.foreach { case (partition, result) =>
|
||||
result.right.foreach { leaderAndIsr =>
|
||||
result.foreach { leaderAndIsr =>
|
||||
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
|
||||
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
|
||||
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
|
||||
|
|
|
@ -332,14 +332,14 @@ class ZkReplicaStateMachine(config: KafkaConfig,
|
|||
): (Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]], Seq[TopicPartition]) = {
|
||||
val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk) = getTopicPartitionStatesFromZk(partitions)
|
||||
val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (_, result) =>
|
||||
result.right.map { leaderAndIsr =>
|
||||
result.map { leaderAndIsr =>
|
||||
leaderAndIsr.isr.contains(replicaId)
|
||||
}.right.getOrElse(false)
|
||||
}.getOrElse(false)
|
||||
}
|
||||
|
||||
val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] = leaderAndIsrsWithReplica.flatMap {
|
||||
case (partition, result) =>
|
||||
result.right.toOption.map { leaderAndIsr =>
|
||||
result.toOption.map { leaderAndIsr =>
|
||||
val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
|
||||
val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
|
||||
partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
|
||||
|
@ -347,10 +347,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
|
|||
}
|
||||
|
||||
val UpdateLeaderAndIsrResult(finishedPartitions, updatesToRetry) = zkClient.updateLeaderAndIsr(
|
||||
adjustedLeaderAndIsrs,
|
||||
controllerContext.epoch,
|
||||
controllerContext.epochZkVersion
|
||||
)
|
||||
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
|
||||
|
||||
val exceptionsForPartitionsWithNoLeaderAndIsrInZk: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
|
||||
partitionsWithNoLeaderAndIsrInZk.iterator.flatMap { partition =>
|
||||
|
@ -364,21 +361,15 @@ class ZkReplicaStateMachine(config: KafkaConfig,
|
|||
}.toMap
|
||||
|
||||
val leaderIsrAndControllerEpochs: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
|
||||
(leaderAndIsrsWithoutReplica ++ finishedPartitions).map { case (partition, result: Either[Exception, LeaderAndIsr]) =>
|
||||
(
|
||||
partition,
|
||||
result.right.map { leaderAndIsr =>
|
||||
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
|
||||
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
|
||||
leaderIsrAndControllerEpoch
|
||||
}
|
||||
)
|
||||
(leaderAndIsrsWithoutReplica ++ finishedPartitions).map { case (partition, result) =>
|
||||
(partition, result.map { leaderAndIsr =>
|
||||
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
|
||||
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
|
||||
leaderIsrAndControllerEpoch
|
||||
})
|
||||
}
|
||||
|
||||
(
|
||||
leaderIsrAndControllerEpochs ++ exceptionsForPartitionsWithNoLeaderAndIsrInZk,
|
||||
updatesToRetry
|
||||
)
|
||||
(leaderIsrAndControllerEpochs ++ exceptionsForPartitionsWithNoLeaderAndIsrInZk, updatesToRetry)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -257,7 +257,7 @@ class TopicDeletionManager(config: KafkaConfig,
|
|||
* removed from their caches.
|
||||
*/
|
||||
private def onTopicDeletion(topics: Set[String]): Unit = {
|
||||
val unseenTopicsForDeletion = topics -- controllerContext.topicsWithDeletionStarted
|
||||
val unseenTopicsForDeletion = topics.diff(controllerContext.topicsWithDeletionStarted)
|
||||
if (unseenTopicsForDeletion.nonEmpty) {
|
||||
val unseenPartitionsForDeletion = unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)
|
||||
partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq, OfflinePartition)
|
||||
|
@ -295,7 +295,7 @@ class TopicDeletionManager(config: KafkaConfig,
|
|||
}
|
||||
|
||||
val successfullyDeletedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
|
||||
val replicasForDeletionRetry = aliveReplicas -- successfullyDeletedReplicas
|
||||
val replicasForDeletionRetry = aliveReplicas.diff(successfullyDeletedReplicas)
|
||||
|
||||
allDeadReplicas ++= deadReplicas
|
||||
allReplicasForDeletionRetry ++= replicasForDeletionRetry
|
||||
|
|
|
@ -35,7 +35,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
|||
import org.apache.kafka.common.requests._
|
||||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection.{Map, Seq, immutable}
|
||||
import scala.collection.{Map, Seq, immutable, mutable}
|
||||
import scala.math.max
|
||||
|
||||
/**
|
||||
|
@ -423,7 +423,7 @@ class GroupCoordinator(val brokerId: Int,
|
|||
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
|
||||
|
||||
// fill any missing members with an empty assignment
|
||||
val missing = group.allMembers -- groupAssignment.keySet
|
||||
val missing = group.allMembers.diff(groupAssignment.keySet)
|
||||
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
|
||||
|
||||
if (missing.nonEmpty) {
|
||||
|
@ -519,8 +519,8 @@ class GroupCoordinator(val brokerId: Int,
|
|||
}
|
||||
|
||||
def handleDeleteGroups(groupIds: Set[String]): Map[String, Errors] = {
|
||||
var groupErrors: Map[String, Errors] = Map()
|
||||
var groupsEligibleForDeletion: Seq[GroupMetadata] = Seq()
|
||||
val groupErrors = mutable.Map.empty[String, Errors]
|
||||
val groupsEligibleForDeletion = mutable.ArrayBuffer[GroupMetadata]()
|
||||
|
||||
groupIds.foreach { groupId =>
|
||||
validateGroupStatus(groupId, ApiKeys.DELETE_GROUPS) match {
|
||||
|
@ -540,9 +540,9 @@ class GroupCoordinator(val brokerId: Int,
|
|||
(if (groupManager.groupNotExists(groupId)) Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR)
|
||||
case Empty =>
|
||||
group.transitionTo(Dead)
|
||||
groupsEligibleForDeletion :+= group
|
||||
groupsEligibleForDeletion += group
|
||||
case Stable | PreparingRebalance | CompletingRebalance =>
|
||||
groupErrors += groupId -> Errors.NON_EMPTY_GROUP
|
||||
groupErrors(groupId) = Errors.NON_EMPTY_GROUP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.types.SchemaException
|
|||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection.{Seq, immutable, mutable}
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
private[group] sealed trait GroupState {
|
||||
val validPreviousStates: Set[GroupState]
|
||||
|
@ -411,22 +411,21 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
|
|||
val candidates = candidateProtocols
|
||||
|
||||
// let each member vote for one of the protocols and choose the one with the most votes
|
||||
val votes: List[(String, Int)] = allMemberMetadata
|
||||
val (protocol, _) = allMemberMetadata
|
||||
.map(_.vote(candidates))
|
||||
.groupBy(identity)
|
||||
.mapValues(_.size)
|
||||
.toList
|
||||
.maxBy { case (_, votes) => votes.size }
|
||||
|
||||
votes.maxBy(_._2)._1
|
||||
protocol
|
||||
}
|
||||
|
||||
private def candidateProtocols = {
|
||||
private def candidateProtocols: Set[String] = {
|
||||
// get the set of protocols that are commonly supported by all members
|
||||
val numMembers = members.size
|
||||
supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet
|
||||
}
|
||||
|
||||
def supportsProtocols(memberProtocolType: String, memberProtocols: Set[String]) = {
|
||||
def supportsProtocols(memberProtocolType: String, memberProtocols: Set[String]): Boolean = {
|
||||
if (is(Empty))
|
||||
!memberProtocolType.isEmpty && memberProtocols.nonEmpty
|
||||
else
|
||||
|
@ -467,11 +466,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
|
|||
}.reduceLeft(_ ++ _)
|
||||
)
|
||||
} catch {
|
||||
case e: SchemaException => {
|
||||
case e: SchemaException =>
|
||||
warn(s"Failed to parse Consumer Protocol ${ConsumerProtocol.PROTOCOL_TYPE}:${protocolName.get} " +
|
||||
s"of group $groupId. Consumer group coordinator is not aware of the subscribed topics.", e)
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
case Some(ConsumerProtocol.PROTOCOL_TYPE) if members.isEmpty =>
|
||||
|
@ -483,7 +481,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
|
|||
|
||||
def updateMember(member: MemberMetadata,
|
||||
protocols: List[(String, Array[Byte])],
|
||||
callback: JoinCallback) = {
|
||||
callback: JoinCallback): Unit = {
|
||||
member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 }
|
||||
protocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }
|
||||
member.supportedProtocols = protocols
|
||||
|
@ -519,7 +517,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
|
|||
}
|
||||
}
|
||||
|
||||
def initNextGeneration() = {
|
||||
def initNextGeneration(): Unit = {
|
||||
if (members.nonEmpty) {
|
||||
generationId += 1
|
||||
protocolName = Some(selectProtocol)
|
||||
|
|
|
@ -49,7 +49,7 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchRespons
|
|||
import org.apache.kafka.common.utils.{Time, Utils}
|
||||
import org.apache.kafka.common.{KafkaException, TopicPartition}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
|
@ -534,7 +534,7 @@ class GroupMetadataManager(brokerId: Int,
|
|||
doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
|
||||
val endTimeMs = time.milliseconds()
|
||||
val totalLoadingTimeMs = endTimeMs - startTimeMs
|
||||
partitionLoadSensor.record(totalLoadingTimeMs, endTimeMs, false)
|
||||
partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false)
|
||||
info(s"Finished loading offsets and group metadata from $topicPartition "
|
||||
+ s"in $totalLoadingTimeMs milliseconds, of which $schedulerTimeMs milliseconds"
|
||||
+ s" was spent in the scheduler.")
|
||||
|
@ -666,19 +666,21 @@ class GroupMetadataManager(brokerId: Int,
|
|||
|
||||
val (groupOffsets, emptyGroupOffsets) = loadedOffsets
|
||||
.groupBy(_._1.group)
|
||||
.mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) })
|
||||
.partition { case (group, _) => loadedGroups.contains(group) }
|
||||
.map { case (k, v) =>
|
||||
k -> v.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) }
|
||||
}.partition { case (group, _) => loadedGroups.contains(group) }
|
||||
|
||||
val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
|
||||
pendingOffsets.foreach { case (producerId, producerOffsets) =>
|
||||
producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
|
||||
producerOffsets
|
||||
.groupBy(_._1.group)
|
||||
.mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)})
|
||||
.foreach { case (group, offsets) =>
|
||||
val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
|
||||
val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
|
||||
groupProducerOffsets ++= offsets
|
||||
groupProducerOffsets ++= offsets.map { case (groupTopicPartition, offset) =>
|
||||
(groupTopicPartition.topicPartition, offset)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ import kafka.utils.{Json, Logging}
|
|||
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
|
||||
import org.apache.kafka.common.KafkaException
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
/**
|
||||
* ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way
|
||||
|
|
|
@ -118,7 +118,7 @@ class TransactionCoordinator(brokerId: Int,
|
|||
// check transactionTimeoutMs is not larger than the broker configured maximum allowed value
|
||||
responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
|
||||
} else {
|
||||
val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).right.flatMap {
|
||||
val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).flatMap {
|
||||
case None =>
|
||||
val producerId = producerIdManager.generateProducerId()
|
||||
val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
|
||||
|
@ -135,7 +135,7 @@ class TransactionCoordinator(brokerId: Int,
|
|||
case Some(epochAndTxnMetadata) => Right(epochAndTxnMetadata)
|
||||
}
|
||||
|
||||
val result: ApiResult[(Int, TxnTransitMetadata)] = coordinatorEpochAndMetadata.right.flatMap {
|
||||
val result: ApiResult[(Int, TxnTransitMetadata)] = coordinatorEpochAndMetadata.flatMap {
|
||||
existingEpochAndMetadata =>
|
||||
val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch
|
||||
val txnMetadata = existingEpochAndMetadata.transactionMetadata
|
||||
|
@ -266,7 +266,7 @@ class TransactionCoordinator(brokerId: Int,
|
|||
} else {
|
||||
// try to update the transaction metadata and append the updated metadata to txn log;
|
||||
// if there is no such metadata treat it as invalid producerId mapping error.
|
||||
val result: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).right.flatMap {
|
||||
val result: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).flatMap {
|
||||
case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)
|
||||
|
||||
case Some(epochAndMetadata) =>
|
||||
|
@ -368,7 +368,7 @@ class TransactionCoordinator(brokerId: Int,
|
|||
if (transactionalId == null || transactionalId.isEmpty)
|
||||
responseCallback(Errors.INVALID_REQUEST)
|
||||
else {
|
||||
val preAppendResult: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).right.flatMap {
|
||||
val preAppendResult: ApiResult[(Int, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).flatMap {
|
||||
case None =>
|
||||
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
|
||||
|
||||
|
@ -440,7 +440,7 @@ class TransactionCoordinator(brokerId: Int,
|
|||
case Right((coordinatorEpoch, newMetadata)) =>
|
||||
def sendTxnMarkersCallback(error: Errors): Unit = {
|
||||
if (error == Errors.NONE) {
|
||||
val preSendResult: ApiResult[(TransactionMetadata, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).right.flatMap {
|
||||
val preSendResult: ApiResult[(TransactionMetadata, TxnTransitMetadata)] = txnManager.getTransactionState(transactionalId).flatMap {
|
||||
case None =>
|
||||
val errorMsg = s"The coordinator still owns the transaction partition for $transactionalId, but there is " +
|
||||
s"no metadata in the cache; this is not expected"
|
||||
|
@ -535,7 +535,7 @@ class TransactionCoordinator(brokerId: Int,
|
|||
private[transaction] def abortTimedOutTransactions(onComplete: TransactionalIdAndProducerIdEpoch => EndTxnCallback): Unit = {
|
||||
|
||||
txnManager.timedOutTransactions().foreach { txnIdAndPidEpoch =>
|
||||
txnManager.getTransactionState(txnIdAndPidEpoch.transactionalId).right.foreach {
|
||||
txnManager.getTransactionState(txnIdAndPidEpoch.transactionalId).foreach {
|
||||
case None =>
|
||||
error(s"Could not find transaction metadata when trying to timeout transaction for $txnIdAndPidEpoch")
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.kafka.common.security.JaasContext
|
|||
import org.apache.kafka.common.utils.{LogContext, Time}
|
||||
import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.{concurrent, immutable}
|
||||
|
||||
object TransactionMarkerChannelManager {
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.kafka.common.protocol.Errors
|
|||
import org.apache.kafka.common.requests.WriteTxnMarkersResponse
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
class TransactionMarkerRequestCompletionHandler(brokerId: Int,
|
||||
txnStateManager: TransactionStateManager,
|
||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.kafka.common.requests.TransactionResult
|
|||
import org.apache.kafka.common.utils.{Time, Utils}
|
||||
import org.apache.kafka.common.{KafkaException, TopicPartition}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable
|
||||
|
||||
|
||||
|
@ -216,8 +216,8 @@ class TransactionStateManager(brokerId: Int,
|
|||
}
|
||||
|
||||
def putTransactionStateIfNotExists(txnMetadata: TransactionMetadata): Either[Errors, CoordinatorEpochAndTxnMetadata] = {
|
||||
getAndMaybeAddTransactionState(txnMetadata.transactionalId, Some(txnMetadata))
|
||||
.right.map(_.getOrElse(throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata")))
|
||||
getAndMaybeAddTransactionState(txnMetadata.transactionalId, Some(txnMetadata)).map(_.getOrElse(
|
||||
throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata")))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -391,7 +391,7 @@ class TransactionStateManager(brokerId: Int,
|
|||
val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch)
|
||||
val endTimeMs = time.milliseconds()
|
||||
val totalLoadingTimeMs = endTimeMs - startTimeMs
|
||||
partitionLoadSensor.record(totalLoadingTimeMs, endTimeMs, false)
|
||||
partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false)
|
||||
info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in " +
|
||||
s"$totalLoadingTimeMs milliseconds, of which $schedulerTimeMs milliseconds was spent in the scheduler.")
|
||||
|
||||
|
@ -439,7 +439,7 @@ class TransactionStateManager(brokerId: Int,
|
|||
def removeTransactionsForTxnTopicPartition(partitionId: Int): Unit = {
|
||||
val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
|
||||
inWriteLock(stateLock) {
|
||||
loadingPartitions.retain(_.txnPartitionId != partitionId)
|
||||
loadingPartitions --= loadingPartitions.filter(_.txnPartitionId == partitionId)
|
||||
transactionMetadataCache.remove(partitionId).foreach { txnMetadataCacheEntry =>
|
||||
info(s"Unloaded transaction metadata $txnMetadataCacheEntry for $topicPartition following " +
|
||||
s"local partition deletion")
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest}
|
|||
import org.apache.kafka.common.utils.{Time, Utils}
|
||||
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
|
||||
import scala.collection.{Seq, Set, mutable}
|
||||
|
||||
|
@ -531,8 +531,8 @@ class Log(@volatile private var _dir: File,
|
|||
Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
|
||||
}
|
||||
|
||||
var swapFiles = Set[File]()
|
||||
var cleanFiles = Set[File]()
|
||||
val swapFiles = mutable.Set[File]()
|
||||
val cleanFiles = mutable.Set[File]()
|
||||
var minCleanedFileOffset = Long.MaxValue
|
||||
|
||||
for (file <- dir.listFiles if file.isFile) {
|
||||
|
@ -767,7 +767,7 @@ class Log(@volatile private var _dir: File,
|
|||
// if we have the clean shutdown marker, skip recovery
|
||||
if (!hasCleanShutdownFile) {
|
||||
// okay we need to actually recover this log
|
||||
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
|
||||
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
|
||||
var truncated = false
|
||||
|
||||
while (unflushed.hasNext && !truncated) {
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
|
|||
import org.apache.kafka.common.record._
|
||||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable.ListBuffer
|
||||
import scala.collection.{Iterable, Seq, Set, mutable}
|
||||
import scala.util.control.ControlThrowable
|
||||
|
@ -381,19 +381,19 @@ class LogCleaner(initialConfig: CleanerConfig,
|
|||
def mb(bytes: Double) = bytes / (1024*1024)
|
||||
val message =
|
||||
"%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) +
|
||||
"\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead),
|
||||
"\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead.toDouble),
|
||||
stats.elapsedSecs,
|
||||
mb(stats.bytesRead/stats.elapsedSecs)) +
|
||||
"\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead),
|
||||
mb(stats.bytesRead.toDouble / stats.elapsedSecs)) +
|
||||
"\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead.toDouble),
|
||||
stats.elapsedIndexSecs,
|
||||
mb(stats.mapBytesRead)/stats.elapsedIndexSecs,
|
||||
100 * stats.elapsedIndexSecs/stats.elapsedSecs) +
|
||||
mb(stats.mapBytesRead.toDouble) / stats.elapsedIndexSecs,
|
||||
100 * stats.elapsedIndexSecs / stats.elapsedSecs) +
|
||||
"\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) +
|
||||
"\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead),
|
||||
"\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead.toDouble),
|
||||
stats.elapsedSecs - stats.elapsedIndexSecs,
|
||||
mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) +
|
||||
"\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead), stats.messagesRead) +
|
||||
"\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) +
|
||||
mb(stats.bytesRead.toDouble) / (stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs) / stats.elapsedSecs) +
|
||||
"\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead.toDouble), stats.messagesRead) +
|
||||
"\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten.toDouble), stats.messagesWritten) +
|
||||
"\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead),
|
||||
100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead))
|
||||
info(message)
|
||||
|
@ -1034,9 +1034,9 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
|
|||
endTime = time.milliseconds
|
||||
}
|
||||
|
||||
def elapsedSecs = (endTime - startTime)/1000.0
|
||||
def elapsedSecs: Double = (endTime - startTime) / 1000.0
|
||||
|
||||
def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0
|
||||
def elapsedIndexSecs: Double = (mapCompleteTime - startTime) / 1000.0
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -410,7 +410,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
|
|||
if (checkpoint != null) {
|
||||
val existing = checkpoint.read()
|
||||
if (existing.getOrElse(topicPartition, 0L) > offset)
|
||||
checkpoint.write(existing + (topicPartition -> offset))
|
||||
checkpoint.write(mutable.Map() ++= existing += topicPartition -> offset)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ package kafka.log
|
|||
|
||||
import java.util.{Collections, Locale, Properties}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import kafka.api.{ApiVersion, ApiVersionValidator}
|
||||
import kafka.message.BrokerCompressionCodec
|
||||
import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition}
|
|||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
@ -741,7 +741,7 @@ class LogManager(logDirs: Seq[File],
|
|||
}
|
||||
|
||||
val logDir = logDirs
|
||||
.toStream // to prevent actually mapping the whole list, lazy map
|
||||
.iterator // to prevent actually mapping the whole list, lazy map
|
||||
.map(createLogDirectory(_, logDirName))
|
||||
.find(_.isSuccess)
|
||||
.getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", "))))
|
||||
|
@ -934,7 +934,7 @@ class LogManager(logDirs: Seq[File],
|
|||
List(_liveLogDirs.peek())
|
||||
} else {
|
||||
// count the number of logs in each parent directory (including 0 for empty directories
|
||||
val logCounts = allLogs.groupBy(_.parentDir).mapValues(_.size)
|
||||
val logCounts = allLogs.groupBy(_.parentDir).map { case (parent, logs) => parent -> logs.size }
|
||||
val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap
|
||||
val dirCounts = (zeros ++ logCounts).toBuffer
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampA
|
|||
import org.apache.kafka.common.record._
|
||||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.math._
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.ProduceResponse.RecordError
|
|||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection.{Seq, mutable}
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
/**
|
||||
|
|
|
@ -147,7 +147,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
|
|||
mmap.putInt(position)
|
||||
_entries += 1
|
||||
_lastOffset = offset
|
||||
require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
|
||||
require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
|
||||
} else {
|
||||
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
|
||||
s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.types._
|
|||
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
|
||||
import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable.ListBuffer
|
||||
import scala.collection.{immutable, mutable}
|
||||
|
||||
|
@ -580,9 +580,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
|
|||
* Expire any producer ids which have been idle longer than the configured maximum expiration timeout.
|
||||
*/
|
||||
def removeExpiredProducers(currentTimeMs: Long): Unit = {
|
||||
producers.retain { case (_, lastEntry) =>
|
||||
!isProducerExpired(currentTimeMs, lastEntry)
|
||||
}
|
||||
producers --= producers.filter { case (_, lastEntry) => isProducerExpired(currentTimeMs, lastEntry) }.keySet
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -135,7 +135,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
|
|||
mmap.putInt(relativeOffset(offset))
|
||||
_entries += 1
|
||||
_lastEntry = TimestampOffset(timestamp, offset)
|
||||
require(_entries * entrySize == mmap.position(), _entries + " entries but file position in index is " + mmap.position() + ".")
|
||||
require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
|
|||
import org.apache.kafka.common.utils.{Sanitizer, Time}
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
object RequestChannel extends Logging {
|
||||
|
|
|
@ -51,7 +51,7 @@ import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
|
|||
import org.slf4j.event.Level
|
||||
|
||||
import scala.collection._
|
||||
import JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable.{ArrayBuffer, Buffer}
|
||||
import scala.util.control.ControlThrowable
|
||||
|
||||
|
@ -180,7 +180,7 @@ class SocketServer(val config: KafkaConfig,
|
|||
.find(_.listenerName == config.interBrokerListenerName)
|
||||
.getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}"))
|
||||
val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++
|
||||
dataPlaneAcceptors.asScala.filterKeys(_ != interBrokerListener).values
|
||||
dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
|
||||
orderedAcceptors.foreach { acceptor =>
|
||||
val endpoint = acceptor.endPoint
|
||||
debug(s"Wait for authorizer to complete start up on listener ${endpoint.listenerName}")
|
||||
|
|
|
@ -70,7 +70,7 @@ case class Resource(resourceType: ResourceType, name: String, patternType: Patte
|
|||
* @deprecated Since 2.0, use [[kafka.security.auth.Resource(ResourceType, String, PatternType)]]
|
||||
*/
|
||||
@deprecated("Use Resource(ResourceType, String, PatternType", "Since 2.0")
|
||||
def this(resourceType: ResourceType, name: String) {
|
||||
def this(resourceType: ResourceType, name: String) = {
|
||||
this(resourceType, name, PatternType.LITERAL)
|
||||
}
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
|
|||
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.compat.java8.OptionConverters._
|
||||
|
||||
@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.4")
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.util.concurrent.{CompletableFuture, CompletionStage}
|
|||
|
||||
import com.typesafe.scalalogging.Logger
|
||||
import kafka.api.KAFKA_2_0_IV1
|
||||
import kafka.security.authorizer.AclAuthorizer.{AclBuffers, ResourceOrdering, VersionedAcls}
|
||||
import kafka.security.authorizer.AclAuthorizer.{AclSeqs, ResourceOrdering, VersionedAcls}
|
||||
import kafka.security.authorizer.AclEntry.ResourceSeparator
|
||||
import kafka.server.{KafkaConfig, KafkaServer}
|
||||
import kafka.utils._
|
||||
|
@ -39,8 +39,8 @@ import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
|
|||
import org.apache.kafka.server.authorizer._
|
||||
import org.apache.zookeeper.client.ZKClientConfig
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.{mutable, Seq}
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.util.{Failure, Random, Success, Try}
|
||||
|
||||
object AclAuthorizer {
|
||||
|
@ -62,7 +62,7 @@ object AclAuthorizer {
|
|||
def exists: Boolean = zkVersion != ZkVersion.UnknownVersion
|
||||
}
|
||||
|
||||
class AclBuffers(classes: mutable.Buffer[AclEntry]*) {
|
||||
class AclSeqs(classes: Seq[AclEntry]*) {
|
||||
def find(p: AclEntry => Boolean): Option[AclEntry] = classes.flatMap(_.find(p)).headOption
|
||||
def isEmpty: Boolean = !classes.exists(_.nonEmpty)
|
||||
}
|
||||
|
@ -255,8 +255,9 @@ class AclAuthorizer extends Authorizer with Logging {
|
|||
}
|
||||
}
|
||||
}
|
||||
val deletedResult = deletedBindings.groupBy(_._2)
|
||||
.mapValues(_.map { case (binding, _) => new AclBindingDeleteResult(binding, deleteExceptions.getOrElse(binding, null)) })
|
||||
val deletedResult = deletedBindings.groupBy(_._2).map { case (k, bindings) =>
|
||||
k -> bindings.keys.map { binding => new AclBindingDeleteResult(binding, deleteExceptions.get(binding).orNull) }
|
||||
}
|
||||
(0 until aclBindingFilters.size).map { i =>
|
||||
new AclDeleteResult(deletedResult.getOrElse(i, Set.empty[AclBindingDeleteResult]).toSet.asJava)
|
||||
}.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
|
||||
|
@ -295,7 +296,7 @@ class AclAuthorizer extends Authorizer with Logging {
|
|||
val host = requestContext.clientAddress.getHostAddress
|
||||
val operation = action.operation
|
||||
|
||||
def isEmptyAclAndAuthorized(acls: AclBuffers): Boolean = {
|
||||
def isEmptyAclAndAuthorized(acls: AclSeqs): Boolean = {
|
||||
if (acls.isEmpty) {
|
||||
// No ACLs found for this resource, permission is determined by value of config allow.everyone.if.no.acl.found
|
||||
authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound")
|
||||
|
@ -303,12 +304,12 @@ class AclAuthorizer extends Authorizer with Logging {
|
|||
} else false
|
||||
}
|
||||
|
||||
def denyAclExists(acls: AclBuffers): Boolean = {
|
||||
def denyAclExists(acls: AclSeqs): Boolean = {
|
||||
// Check if there are any Deny ACLs which would forbid this operation.
|
||||
matchingAclExists(operation, resource, principal, host, DENY, acls)
|
||||
}
|
||||
|
||||
def allowAclExists(acls: AclBuffers): Boolean = {
|
||||
def allowAclExists(acls: AclSeqs): Boolean = {
|
||||
// Check if there are any Allow ACLs which would allow this operation.
|
||||
// Allowing read, write, delete, or alter implies allowing describe.
|
||||
// See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance.
|
||||
|
@ -341,7 +342,7 @@ class AclAuthorizer extends Authorizer with Logging {
|
|||
} else false
|
||||
}
|
||||
|
||||
private def matchingAcls(resourceType: ResourceType, resourceName: String): AclBuffers = {
|
||||
private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = {
|
||||
// save aclCache reference to a local val to get a consistent view of the cache during acl updates.
|
||||
val aclCacheSnapshot = aclCache
|
||||
val wildcard = aclCacheSnapshot.get(new ResourcePattern(resourceType, ResourcePattern.WILDCARD_RESOURCE, PatternType.LITERAL))
|
||||
|
@ -355,12 +356,10 @@ class AclAuthorizer extends Authorizer with Logging {
|
|||
val prefixed = aclCacheSnapshot
|
||||
.from(new ResourcePattern(resourceType, resourceName, PatternType.PREFIXED))
|
||||
.to(new ResourcePattern(resourceType, resourceName.take(1), PatternType.PREFIXED))
|
||||
.filterKeys(resource => resourceName.startsWith(resource.name))
|
||||
.values
|
||||
.flatMap { _.acls.toBuffer }
|
||||
.flatMap { case (resource, acls) => if (resourceName.startsWith(resource.name)) acls.acls else Seq.empty }
|
||||
.toBuffer
|
||||
|
||||
new AclBuffers(prefixed, wildcard, literal)
|
||||
new AclSeqs(prefixed, wildcard, literal)
|
||||
}
|
||||
|
||||
private def matchingAclExists(operation: AclOperation,
|
||||
|
@ -368,7 +367,7 @@ class AclAuthorizer extends Authorizer with Logging {
|
|||
principal: KafkaPrincipal,
|
||||
host: String,
|
||||
permissionType: AclPermissionType,
|
||||
acls: AclBuffers): Boolean = {
|
||||
acls: AclSeqs): Boolean = {
|
||||
acls.find { acl =>
|
||||
acl.permissionType == permissionType &&
|
||||
(acl.kafkaPrincipal == principal || acl.kafkaPrincipal == AclEntry.WildcardPrincipal) &&
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
|
|||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
||||
import org.apache.kafka.common.utils.SecurityUtils
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
object AclEntry {
|
||||
val WildcardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.kafka.common.utils.SecurityUtils.parseKafkaPrincipal
|
|||
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
|
||||
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, AuthorizerServerInfo, _}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.collection.{Seq, immutable, mutable}
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
@ -175,4 +175,4 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A
|
|||
override def close(): Unit = {
|
||||
baseAuthorizer.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ import kafka.utils.CoreUtils.inLock
|
|||
import org.apache.kafka.common.protocol.Errors
|
||||
|
||||
import scala.collection.{mutable, Map, Set}
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, Describe
|
|||
import org.apache.kafka.common.utils.Sanitizer
|
||||
|
||||
import scala.collection.{Map, mutable, _}
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
class AdminManager(val config: KafkaConfig,
|
||||
val metrics: Metrics,
|
||||
|
@ -518,8 +518,9 @@ class AdminManager(val config: KafkaConfig,
|
|||
configs.map { case (resource, alterConfigOps) =>
|
||||
try {
|
||||
// throw InvalidRequestException if any duplicate keys
|
||||
val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry().name())
|
||||
.mapValues(_.size).filter(_._2 > 1).keys.toSet
|
||||
val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry.name).filter { case (_, v) =>
|
||||
v.size > 1
|
||||
}.keySet
|
||||
if (duplicateKeys.nonEmpty)
|
||||
throw new InvalidRequestException(s"Error due to duplicate config keys : ${duplicateKeys.mkString(",")}")
|
||||
val nullUpdates = alterConfigOps
|
||||
|
@ -773,7 +774,7 @@ class AdminManager(val config: KafkaConfig,
|
|||
}
|
||||
|
||||
def handleDescribeClientQuotas(userComponent: Option[ClientQuotaFilterComponent],
|
||||
clientIdComponent: Option[ClientQuotaFilterComponent], strict: Boolean) = {
|
||||
clientIdComponent: Option[ClientQuotaFilterComponent], strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = {
|
||||
|
||||
def toOption(opt: java.util.Optional[String]): Option[String] =
|
||||
if (opt == null)
|
||||
|
|
|
@ -66,7 +66,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
|
|||
}
|
||||
|
||||
def read(): Option[BrokerMetadata] = {
|
||||
Files.deleteIfExists(new File(file + ".tmp").toPath()) // try to delete any existing temp files for cleanliness
|
||||
Files.deleteIfExists(new File(file.getPath + ".tmp").toPath()) // try to delete any existing temp files for cleanliness
|
||||
|
||||
lock synchronized {
|
||||
try {
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
|
|||
import org.apache.kafka.common.utils.{Sanitizer, Time}
|
||||
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
/**
|
||||
* Represents the sensors aggregated per client
|
||||
|
@ -162,7 +162,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
|
|||
private val time: Time,
|
||||
threadNamePrefix: String,
|
||||
clientQuotaCallback: Option[ClientQuotaCallback] = None) extends Logging {
|
||||
private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
|
||||
private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault.toDouble)
|
||||
private val clientQuotaType = quotaTypeToClientQuotaType(quotaType)
|
||||
@volatile private var quotaTypesEnabled = clientQuotaCallback match {
|
||||
case Some(_) => QuotaTypes.CustomQuotas
|
||||
|
@ -176,7 +176,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
|
|||
private[server] val throttledChannelReaper = new ThrottledChannelReaper(delayQueue, threadNamePrefix)
|
||||
private val quotaCallback = clientQuotaCallback.getOrElse(new DefaultQuotaCallback)
|
||||
|
||||
private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
|
||||
private val delayQueueSensor = metrics.sensor(quotaType.toString + "-delayQueue")
|
||||
delayQueueSensor.add(metrics.metricName("queue-size",
|
||||
quotaType.toString,
|
||||
"Tracks the size of the delay queue"), new CumulativeSum())
|
||||
|
@ -506,8 +506,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
|
|||
}
|
||||
} else {
|
||||
val quotaMetricName = clientRateMetricName(Map.empty)
|
||||
allMetrics.asScala.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
|
||||
case (metricName, metric) =>
|
||||
allMetrics.asScala.foreach { case (metricName, metric) =>
|
||||
if (metricName.name == quotaMetricName.name && metricName.group == quotaMetricName.group) {
|
||||
val metricTags = metricName.tags
|
||||
Option(quotaLimit(metricTags)).foreach { newQuota =>
|
||||
if (newQuota != metric.config.quota.bound) {
|
||||
|
@ -515,6 +515,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
|
|||
metric.config(getQuotaMetricConfig(newQuota))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.kafka.common.metrics._
|
|||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.server.quota.ClientQuotaCallback
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
|
||||
class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.kafka.common.metrics.Quota
|
|||
import org.apache.kafka.common.metrics.Quota._
|
||||
import org.apache.kafka.common.utils.Sanitizer
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.Seq
|
||||
import scala.util.Try
|
||||
|
||||
|
@ -128,13 +128,13 @@ class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
|
|||
val clientId = sanitizedClientId.map(Sanitizer.desanitize)
|
||||
val producerQuota =
|
||||
if (config.containsKey(DynamicConfig.Client.ProducerByteRateOverrideProp))
|
||||
Some(new Quota(config.getProperty(DynamicConfig.Client.ProducerByteRateOverrideProp).toLong, true))
|
||||
Some(new Quota(config.getProperty(DynamicConfig.Client.ProducerByteRateOverrideProp).toLong.toDouble, true))
|
||||
else
|
||||
None
|
||||
quotaManagers.produce.updateQuota(sanitizedUser, clientId, sanitizedClientId, producerQuota)
|
||||
val consumerQuota =
|
||||
if (config.containsKey(DynamicConfig.Client.ConsumerByteRateOverrideProp))
|
||||
Some(new Quota(config.getProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp).toLong, true))
|
||||
Some(new Quota(config.getProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp).toLong.toDouble, true))
|
||||
else
|
||||
None
|
||||
quotaManagers.fetch.updateQuota(sanitizedUser, clientId, sanitizedClientId, consumerQuota)
|
||||
|
@ -197,9 +197,9 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
|
|||
brokerConfig.dynamicConfig.updateDefaultConfig(properties)
|
||||
else if (brokerConfig.brokerId == brokerId.trim.toInt) {
|
||||
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
|
||||
quotaManagers.leader.updateQuota(upperBound(getOrDefault(LeaderReplicationThrottledRateProp)))
|
||||
quotaManagers.follower.updateQuota(upperBound(getOrDefault(FollowerReplicationThrottledRateProp)))
|
||||
quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(ReplicaAlterLogDirsIoMaxBytesPerSecondProp)))
|
||||
quotaManagers.leader.updateQuota(upperBound(getOrDefault(LeaderReplicationThrottledRateProp).toDouble))
|
||||
quotaManagers.follower.updateQuota(upperBound(getOrDefault(FollowerReplicationThrottledRateProp).toDouble))
|
||||
quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(ReplicaAlterLogDirsIoMaxBytesPerSecondProp).toDouble))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ class DelayedElectLeader(
|
|||
responseCallback: Map[TopicPartition, ApiError] => Unit
|
||||
) extends DelayedOperation(delayMs) {
|
||||
|
||||
private var waitingPartitions = expectedLeaders
|
||||
private val waitingPartitions = mutable.Map() ++= expectedLeaders
|
||||
private val fullResults = mutable.Map() ++= results
|
||||
|
||||
|
||||
|
@ -52,7 +52,7 @@ class DelayedElectLeader(
|
|||
updateWaiting()
|
||||
val timedOut = waitingPartitions.map {
|
||||
case (tp, _) => tp -> new ApiError(Errors.REQUEST_TIMED_OUT, null)
|
||||
}.toMap
|
||||
}
|
||||
responseCallback(timedOut ++ fullResults)
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
|
|||
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
|
||||
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils, Time}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable
|
||||
|
||||
object DelegationTokenManager {
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.kafka.common.security.authenticator.LoginManager
|
|||
import org.apache.kafka.common.utils.Utils
|
||||
|
||||
import scala.collection._
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
/**
|
||||
* Dynamic broker configurations are stored in ZooKeeper and may be defined at two levels:
|
||||
|
@ -85,8 +85,8 @@ object DynamicBrokerConfig {
|
|||
SocketServer.ReconfigurableConfigs
|
||||
|
||||
private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp)
|
||||
private val PerBrokerConfigs = DynamicSecurityConfigs ++
|
||||
DynamicListenerConfig.ReconfigurableConfigs -- ClusterLevelListenerConfigs
|
||||
private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(
|
||||
ClusterLevelListenerConfigs)
|
||||
private val ListenerMechanismConfigs = Set(KafkaConfig.SaslJaasConfigProp,
|
||||
KafkaConfig.SaslLoginCallbackHandlerClassProp,
|
||||
KafkaConfig.SaslLoginClassProp,
|
||||
|
@ -168,10 +168,12 @@ object DynamicBrokerConfig {
|
|||
}
|
||||
|
||||
private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = {
|
||||
KafkaConfig.configKeys.filterKeys(AllDynamicConfigs.contains).values.foreach { config =>
|
||||
configDef.define(config.name, config.`type`, config.defaultValue, config.validator,
|
||||
config.importance, config.documentation, config.group, config.orderInGroup, config.width,
|
||||
config.displayName, config.dependents, config.recommender)
|
||||
KafkaConfig.configKeys.foreach { case (configName, config) =>
|
||||
if (AllDynamicConfigs.contains(configName)) {
|
||||
configDef.define(config.name, config.`type`, config.defaultValue, config.validator,
|
||||
config.importance, config.documentation, config.group, config.orderInGroup, config.width,
|
||||
config.displayName, config.dependents, config.recommender)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -352,7 +354,10 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
|
|||
props.setProperty(configName, passwordEncoder.encode(new Password(value)))
|
||||
}
|
||||
}
|
||||
configProps.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => encodePassword(name, value) }
|
||||
configProps.asScala.foreach { case (name, value) =>
|
||||
if (isPasswordConfig(name))
|
||||
encodePassword(name, value)
|
||||
}
|
||||
props
|
||||
}
|
||||
|
||||
|
@ -386,7 +391,10 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
|
|||
}
|
||||
}
|
||||
|
||||
props.asScala.filterKeys(isPasswordConfig).foreach { case (name, value) => decodePassword(name, value) }
|
||||
props.asScala.foreach { case (name, value) =>
|
||||
if (isPasswordConfig(name))
|
||||
decodePassword(name, value)
|
||||
}
|
||||
props
|
||||
}
|
||||
|
||||
|
@ -398,8 +406,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
|
|||
val props = persistentProps.clone().asInstanceOf[Properties]
|
||||
if (props.asScala.keySet.exists(isPasswordConfig)) {
|
||||
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
|
||||
persistentProps.asScala.filterKeys(isPasswordConfig).foreach { case (configName, value) =>
|
||||
if (value != null) {
|
||||
persistentProps.asScala.foreach { case (configName, value) =>
|
||||
if (isPasswordConfig(configName) && value != null) {
|
||||
val decoded = try {
|
||||
Some(passwordDecoder.decode(value).value)
|
||||
} catch {
|
||||
|
@ -633,7 +641,7 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Brok
|
|||
logManager.allLogs.foreach { log =>
|
||||
val props = mutable.Map.empty[Any, Any]
|
||||
props ++= newBrokerDefaults
|
||||
props ++= log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)
|
||||
props ++= log.config.originals.asScala.filter { case (k, _) => log.config.overriddenConfigs.contains(k) }
|
||||
|
||||
val logConfig = LogConfig(props.asJava, log.config.overriddenConfigs)
|
||||
log.updateConfig(logConfig)
|
||||
|
@ -644,8 +652,8 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Brok
|
|||
val currentLogConfig = logManager.currentDefaultConfig
|
||||
val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
|
||||
val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals)
|
||||
newConfig.valuesFromThisConfig.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach { case (k, v) =>
|
||||
if (v != null) {
|
||||
newConfig.valuesFromThisConfig.asScala.foreach { case (k, v) =>
|
||||
if (DynamicLogConfig.ReconfigurableConfigs.contains(k) && v != null) {
|
||||
DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
|
||||
newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
|
||||
}
|
||||
|
@ -678,17 +686,19 @@ class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable {
|
|||
}
|
||||
|
||||
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
|
||||
newConfig.values.asScala.filterKeys(DynamicThreadPool.ReconfigurableConfigs.contains).foreach { case (k, v) =>
|
||||
val newValue = v.asInstanceOf[Int]
|
||||
val oldValue = currentValue(k)
|
||||
if (newValue != oldValue) {
|
||||
val errorMsg = s"Dynamic thread count update validation failed for $k=$v"
|
||||
if (newValue <= 0)
|
||||
throw new ConfigException(s"$errorMsg, value should be at least 1")
|
||||
if (newValue < oldValue / 2)
|
||||
throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue")
|
||||
if (newValue > oldValue * 2)
|
||||
throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue")
|
||||
newConfig.values.asScala.foreach { case (k, v) =>
|
||||
if (DynamicThreadPool.ReconfigurableConfigs.contains(k)) {
|
||||
val newValue = v.asInstanceOf[Int]
|
||||
val oldValue = currentValue(k)
|
||||
if (newValue != oldValue) {
|
||||
val errorMsg = s"Dynamic thread count update validation failed for $k=$v"
|
||||
if (newValue <= 0)
|
||||
throw new ConfigException(s"$errorMsg, value should be at least 1")
|
||||
if (newValue < oldValue / 2)
|
||||
throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue")
|
||||
if (newValue > oldValue * 2)
|
||||
throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -762,7 +772,7 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
|
|||
|
||||
override def reconfigure(configs: util.Map[String, _]): Unit = {
|
||||
val updatedMetricsReporters = metricsReporterClasses(configs)
|
||||
val deleted = currentReporters.keySet -- updatedMetricsReporters
|
||||
val deleted = currentReporters.keySet.toSet -- updatedMetricsReporters
|
||||
deleted.foreach(removeReporter)
|
||||
currentReporters.values.foreach {
|
||||
case reporter: Reconfigurable => dynamicConfig.maybeReconfigure(reporter, dynamicConfig.currentKafkaConfig, configs)
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance._
|
|||
import org.apache.kafka.common.config.ConfigDef.Range._
|
||||
import org.apache.kafka.common.config.ConfigDef.Type._
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
/**
|
||||
* Class used to hold dynamic configs. These are configs which have no physical manifestation in the server.properties
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.kafka.common.config.types.Password
|
|||
import org.apache.kafka.common.security.scram.internals.ScramMechanism
|
||||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection._
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, Time, Utils}
|
|||
|
||||
import scala.math.Ordered.orderingToOrdered
|
||||
import scala.collection.{mutable, _}
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
object FetchSession {
|
||||
type REQ_MAP = util.Map[TopicPartition, FetchRequest.PartitionData]
|
||||
|
|
|
@ -81,7 +81,7 @@ import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePar
|
|||
import org.apache.kafka.server.authorizer._
|
||||
|
||||
import scala.compat.java8.OptionConverters._
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.collection.{Map, Seq, Set, immutable, mutable}
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
@ -892,9 +892,9 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
case (topicPartition, _) => authorizedTopics.contains(topicPartition.topic)
|
||||
}
|
||||
|
||||
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
|
||||
new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, List[JLong]().asJava)
|
||||
)
|
||||
val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) =>
|
||||
k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava)
|
||||
}
|
||||
|
||||
val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) =>
|
||||
try {
|
||||
|
@ -932,12 +932,12 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
case (topicPartition, _) => authorizedTopics.contains(topicPartition.topic)
|
||||
}
|
||||
|
||||
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
|
||||
new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
|
||||
val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) =>
|
||||
k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
|
||||
ListOffsetResponse.UNKNOWN_TIMESTAMP,
|
||||
ListOffsetResponse.UNKNOWN_OFFSET,
|
||||
Optional.empty())
|
||||
})
|
||||
}
|
||||
|
||||
val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) =>
|
||||
if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
|
||||
|
@ -1081,7 +1081,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
if (topics.isEmpty || topicResponses.size == topics.size) {
|
||||
topicResponses
|
||||
} else {
|
||||
val nonExistentTopics = topics -- topicResponses.map(_.name).toSet
|
||||
val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
|
||||
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
|
||||
if (isInternal(topic)) {
|
||||
val topicMetadata = createInternalTopic(topic)
|
||||
|
@ -1120,8 +1120,8 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
|
||||
if (!authorize(request, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) {
|
||||
val authorizedForCreateTopics = filterAuthorized(request, CREATE, TOPIC, nonExistingTopics.toSeq)
|
||||
unauthorizedForCreateTopics = nonExistingTopics -- authorizedForCreateTopics
|
||||
authorizedTopics --= unauthorizedForCreateTopics
|
||||
unauthorizedForCreateTopics = nonExistingTopics.diff(authorizedForCreateTopics)
|
||||
authorizedTopics = authorizedTopics.diff(unauthorizedForCreateTopics)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2201,7 +2201,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
|
||||
// the callback for sending an offset commit response
|
||||
def sendResponseCallback(authorizedTopicErrors: Map[TopicPartition, Errors]): Unit = {
|
||||
var combinedCommitStatus = authorizedTopicErrors ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
|
||||
val combinedCommitStatus = mutable.Map() ++= authorizedTopicErrors ++= unauthorizedTopicErrors ++= nonExistingTopicErrors
|
||||
if (isDebugEnabled)
|
||||
combinedCommitStatus.foreach { case (topicPartition, error) =>
|
||||
if (error != Errors.NONE) {
|
||||
|
@ -2216,10 +2216,8 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
// txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed to have
|
||||
// the fix to check for the loading error.
|
||||
if (txnOffsetCommitRequest.version < 2) {
|
||||
combinedCommitStatus.foreach { case (topicPartition, error) =>
|
||||
if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
|
||||
combinedCommitStatus += topicPartition -> Errors.COORDINATOR_NOT_AVAILABLE
|
||||
}
|
||||
combinedCommitStatus ++= combinedCommitStatus.collect {
|
||||
case (tp, error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS => tp -> Errors.COORDINATOR_NOT_AVAILABLE
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2370,9 +2368,10 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
}
|
||||
|
||||
val endOffsetsForAuthorizedPartitions = replicaManager.lastOffsetForLeaderEpoch(authorizedPartitions)
|
||||
val endOffsetsForUnauthorizedPartitions = unauthorizedPartitions.mapValues(_ =>
|
||||
new EpochEndOffset(Errors.TOPIC_AUTHORIZATION_FAILED, EpochEndOffset.UNDEFINED_EPOCH,
|
||||
EpochEndOffset.UNDEFINED_EPOCH_OFFSET))
|
||||
val endOffsetsForUnauthorizedPartitions = unauthorizedPartitions.map { case (k, _) =>
|
||||
k -> new EpochEndOffset(Errors.TOPIC_AUTHORIZATION_FAILED, EpochEndOffset.UNDEFINED_EPOCH,
|
||||
EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
|
||||
}
|
||||
|
||||
val endOffsetsForAllPartitions = endOffsetsForAuthorizedPartitions ++ endOffsetsForUnauthorizedPartitions
|
||||
sendResponseMaybeThrottle(request, requestThrottleMs =>
|
||||
|
@ -2844,8 +2843,9 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
val describeClientQuotasRequest = request.body[DescribeClientQuotasRequest]
|
||||
|
||||
if (authorize(request, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) {
|
||||
val result = adminManager.describeClientQuotas(
|
||||
describeClientQuotasRequest.filter).mapValues(_.mapValues(Double.box).toMap.asJava).toMap.asJava
|
||||
val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter).map { case (quotaEntity, quotaConfigs) =>
|
||||
quotaEntity -> quotaConfigs.map { case (key, value) => key -> Double.box(value) }.asJava
|
||||
}.asJava
|
||||
sendResponseMaybeThrottle(request, requestThrottleMs =>
|
||||
new DescribeClientQuotasResponse(result, requestThrottleMs))
|
||||
} else {
|
||||
|
@ -2890,15 +2890,16 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
logIfDenied: Boolean = true): Set[String] = {
|
||||
authorizer match {
|
||||
case Some(authZ) =>
|
||||
val resources = resourceNames.groupBy(identity).mapValues(_.size).toList
|
||||
val actions = resources.map { case (resourceName, count) =>
|
||||
val groupedResourceNames = resourceNames.groupBy(identity)
|
||||
val actions = resourceNames.map { resourceName =>
|
||||
val count = groupedResourceNames(resourceName).size
|
||||
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
|
||||
new Action(operation, resource, count, logIfAllowed, logIfDenied)
|
||||
}
|
||||
authZ.authorize(request.context, actions.asJava).asScala
|
||||
.zip(resources.map(_._1)) // zip with resource name
|
||||
.filter(_._1 == AuthorizationResult.ALLOWED) // filter authorized resources
|
||||
.map(_._2).toSet
|
||||
.zip(resourceNames)
|
||||
.filter { case (authzResult, _) => authzResult == AuthorizationResult.ALLOWED }
|
||||
.map { case (_, resourceName) => resourceName }.toSet
|
||||
case None =>
|
||||
resourceNames.toSet
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.kafka.common.utils.Utils
|
|||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
import org.apache.zookeeper.client.ZKClientConfig
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.{Map, Seq}
|
||||
|
||||
object Defaults {
|
||||
|
@ -239,7 +239,7 @@ object Defaults {
|
|||
|
||||
/** ********* Sasl configuration ***********/
|
||||
val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM
|
||||
val SaslEnabledMechanisms = SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS
|
||||
val SaslEnabledMechanisms = BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS
|
||||
val SaslKerberosKinitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD
|
||||
val SaslKerberosTicketRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
|
||||
val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.kafka.common.internals.FatalExitError
|
|||
import org.apache.kafka.common.utils.{KafkaThread, Time}
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
/**
|
||||
* A thread that answers kafka requests.
|
||||
|
|
|
@ -50,7 +50,7 @@ import org.apache.kafka.common.{ClusterResource, Endpoint, Node}
|
|||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
import org.apache.zookeeper.client.ZKClientConfig
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.{Map, Seq, mutable}
|
||||
|
||||
object KafkaServer {
|
||||
|
@ -307,9 +307,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
|
|||
authorizer.foreach(_.configure(config.originals))
|
||||
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
|
||||
case Some(authZ) =>
|
||||
authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.mapValues(_.toCompletableFuture).toMap
|
||||
authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
|
||||
ep -> cs.toCompletableFuture
|
||||
}
|
||||
case None =>
|
||||
brokerInfo.broker.endPoints.map { ep => ep.toJava -> CompletableFuture.completedFuture[Void](null) }.toMap
|
||||
brokerInfo.broker.endPoints.map { ep =>
|
||||
ep.toJava -> CompletableFuture.completedFuture[Void](null)
|
||||
}.toMap
|
||||
}
|
||||
|
||||
val fetchManager = new FetchManager(Time.SYSTEM,
|
||||
|
@ -726,7 +730,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
|
|||
}
|
||||
|
||||
if (brokerMetadataSet.size > 1) {
|
||||
val builder = StringBuilder.newBuilder
|
||||
val builder = new StringBuilder
|
||||
|
||||
for ((logDir, brokerMetadata) <- brokerMetadataMap)
|
||||
builder ++= s"- $logDir -> $brokerMetadata\n"
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.util.{Collections}
|
|||
import java.util.concurrent.locks.ReentrantReadWriteLock
|
||||
|
||||
import scala.collection.{mutable, Seq, Set}
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import kafka.cluster.{Broker, EndPoint}
|
||||
import kafka.api._
|
||||
import kafka.controller.StateChangeLogger
|
||||
|
@ -199,7 +199,7 @@ class MetadataCache(brokerId: Int) extends Logging {
|
|||
}
|
||||
|
||||
def getNonExistingTopics(topics: Set[String]): Set[String] = {
|
||||
topics -- metadataSnapshot.partitionStates.keySet
|
||||
topics.diff(metadataSnapshot.partitionStates.keySet)
|
||||
}
|
||||
|
||||
def getAliveBroker(brokerId: Int): Option[Broker] = {
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.kafka.common.requests.EpochEndOffset._
|
|||
import org.apache.kafka.common.requests.FetchResponse.PartitionData
|
||||
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.{mutable, Map, Seq, Set}
|
||||
|
||||
class ReplicaAlterLogDirsThread(name: String,
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpda
|
|||
import org.apache.kafka.common.{Node, Reconfigurable}
|
||||
import org.apache.kafka.common.requests.AbstractRequest.Builder
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
trait BlockingSend {
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.kafka.common.requests.EpochEndOffset._
|
|||
import org.apache.kafka.common.requests._
|
||||
import org.apache.kafka.common.utils.{LogContext, Time}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.{mutable, Map}
|
||||
|
||||
class ReplicaFetcherThread(name: String,
|
||||
|
|
|
@ -57,7 +57,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
|
|||
import org.apache.kafka.common.requests._
|
||||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.{Map, Seq, Set, mutable}
|
||||
import scala.compat.java8.OptionConverters._
|
||||
|
||||
|
@ -181,7 +181,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
brokerTopicStats: BrokerTopicStats,
|
||||
metadataCache: MetadataCache,
|
||||
logDirFailureChannel: LogDirFailureChannel,
|
||||
threadNamePrefix: Option[String] = None) {
|
||||
threadNamePrefix: Option[String] = None) = {
|
||||
this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown,
|
||||
quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
|
||||
DelayedOperationPurgatory[DelayedProduce](
|
||||
|
@ -1256,14 +1256,14 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
}
|
||||
}
|
||||
|
||||
val partitionsTobeLeader = partitionStates.filter { case (_, partitionState) =>
|
||||
val partitionsToBeLeader = partitionStates.filter { case (_, partitionState) =>
|
||||
partitionState.leader == localBrokerId
|
||||
}
|
||||
val partitionsToBeFollower = partitionStates -- partitionsTobeLeader.keys
|
||||
val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) }
|
||||
|
||||
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
|
||||
val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
|
||||
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap,
|
||||
val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
|
||||
makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
|
||||
highWatermarkCheckpoints)
|
||||
else
|
||||
Set.empty[Partition]
|
||||
|
|
|
@ -134,7 +134,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
|
|||
*/
|
||||
def record(value: Long): Unit = {
|
||||
try {
|
||||
sensor().record(value)
|
||||
sensor().record(value.toDouble)
|
||||
} catch {
|
||||
case qve: QuotaViolationException =>
|
||||
trace(s"Record: Quota violated, but ignored, for sensor (${sensor.name}), metric: (${qve.metricName}), value : (${qve.value}), bound: (${qve.bound}), recordedValue ($value)")
|
||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest
|
|||
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
/**
|
||||
* Consumer that dumps messages to standard out.
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
|
|||
import org.apache.kafka.common.KafkaException
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
object ConsoleProducer {
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
|
|||
import org.apache.kafka.common.utils.Utils
|
||||
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable
|
||||
|
||||
/**
|
||||
|
@ -175,7 +175,7 @@ object ConsumerPerformance extends LazyLogging {
|
|||
startMs: Long,
|
||||
endMs: Long,
|
||||
dateFormat: SimpleDateFormat): Unit = {
|
||||
val elapsedMs: Double = endMs - startMs
|
||||
val elapsedMs: Double = (endMs - startMs).toDouble
|
||||
val totalMbRead = (bytesRead * 1.0) / (1024 * 1024)
|
||||
val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
|
||||
val intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs
|
||||
|
|
|
@ -27,7 +27,7 @@ import kafka.utils._
|
|||
import org.apache.kafka.common.record._
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.kafka.clients.producer._
|
|||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.util.Random
|
||||
|
||||
|
||||
|
@ -147,7 +147,7 @@ object EndToEndLatency {
|
|||
|
||||
//Report progress
|
||||
if (i % 1000 == 0)
|
||||
println(i + "\t" + elapsed / 1000.0 / 1000.0)
|
||||
println(i.toString + "\t" + elapsed / 1000.0 / 1000.0)
|
||||
totalTime += elapsed
|
||||
latencies(i) = elapsed / 1000 / 1000
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.kafka.common.{PartitionInfo, TopicPartition}
|
|||
import org.apache.kafka.common.requests.ListOffsetRequest
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.Seq
|
||||
|
||||
object GetOffsetShell {
|
||||
|
@ -143,7 +143,7 @@ object GetOffsetShell {
|
|||
* Return the partition infos for `topic`. If the topic does not exist, `None` is returned.
|
||||
*/
|
||||
private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = {
|
||||
val partitionInfos = consumer.listTopics.asScala.filterKeys(_ == topic).values.flatMap(_.asScala).toBuffer
|
||||
val partitionInfos = consumer.listTopics.asScala.filter { case (k, _) => k == topic }.values.flatMap(_.asScala).toBuffer
|
||||
if (partitionInfos.isEmpty)
|
||||
None
|
||||
else if (partitionIds.isEmpty)
|
||||
|
|
|
@ -26,7 +26,7 @@ import javax.rmi.ssl.SslRMIClientSocketFactory
|
|||
|
||||
import joptsimple.OptionParser
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable
|
||||
import scala.math._
|
||||
import kafka.utils.{CommandLineUtils, Exit, Logging}
|
||||
|
@ -258,8 +258,8 @@ object JmxTool extends Logging {
|
|||
attributesWhitelist match {
|
||||
case Some(allowedAttributes) =>
|
||||
if (allowedAttributes.contains(attr.getName))
|
||||
attributes(name + ":" + attr.getName) = attr.getValue
|
||||
case None => attributes(name + ":" + attr.getName) = attr.getValue
|
||||
attributes(name.toString + ":" + attr.getName) = attr.getValue
|
||||
case None => attributes(name.toString + ":" + attr.getName) = attr.getValue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
|
|||
import org.apache.kafka.common.utils.{Time, Utils}
|
||||
import org.apache.kafka.common.{KafkaException, TopicPartition}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.mutable.HashMap
|
||||
import scala.util.control.ControlThrowable
|
||||
import scala.util.{Failure, Success, Try}
|
||||
|
@ -134,7 +134,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
|
|||
case _: TimeoutException =>
|
||||
Try(consumerWrapper.consumer.listTopics) match {
|
||||
case Success(visibleTopics) =>
|
||||
consumerWrapper.offsets.retain((tp, _) => visibleTopics.containsKey(tp.topic))
|
||||
consumerWrapper.offsets --= consumerWrapper.offsets.keySet.filter(tp => !visibleTopics.containsKey(tp.topic))
|
||||
case Failure(e) =>
|
||||
warn("Failed to list all authorized topics after committing offsets timed out: ", e)
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.kafka.common.serialization.StringDeserializer
|
|||
import org.apache.kafka.common.utils.{LogContext, Time}
|
||||
import org.apache.kafka.common.{Node, TopicPartition}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.collection.Seq
|
||||
|
||||
/**
|
||||
|
@ -225,16 +225,16 @@ object ReplicaVerificationTool extends Logging {
|
|||
}
|
||||
|
||||
private def initialOffsets(topicPartitions: Seq[TopicPartition], consumerConfig: Properties,
|
||||
initialOffsetTime: Long): Map[TopicPartition, Long] = {
|
||||
initialOffsetTime: Long): collection.Map[TopicPartition, Long] = {
|
||||
val consumer = createConsumer(consumerConfig)
|
||||
try {
|
||||
if (ListOffsetRequest.LATEST_TIMESTAMP == initialOffsetTime)
|
||||
consumer.endOffsets(topicPartitions.asJava).asScala.mapValues(_.longValue).toMap
|
||||
consumer.endOffsets(topicPartitions.asJava).asScala.map { case (k, v) => k -> v.longValue }
|
||||
else if (ListOffsetRequest.EARLIEST_TIMESTAMP == initialOffsetTime)
|
||||
consumer.beginningOffsets(topicPartitions.asJava).asScala.mapValues(_.longValue).toMap
|
||||
consumer.beginningOffsets(topicPartitions.asJava).asScala.map { case (k, v) => k -> v.longValue }
|
||||
else {
|
||||
val timestampsToSearch = topicPartitions.map(tp => tp -> (initialOffsetTime: java.lang.Long)).toMap
|
||||
consumer.offsetsForTimes(timestampsToSearch.asJava).asScala.mapValues(v => v.offset).toMap
|
||||
consumer.offsetsForTimes(timestampsToSearch.asJava).asScala.map { case (k, v) => k -> v.offset }
|
||||
}
|
||||
} finally consumer.close()
|
||||
}
|
||||
|
@ -256,8 +256,8 @@ private case class TopicPartitionReplica(topic: String, partitionId: Int, replic
|
|||
|
||||
private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long)
|
||||
|
||||
private class ReplicaBuffer(expectedReplicasPerTopicPartition: Map[TopicPartition, Int],
|
||||
initialOffsets: Map[TopicPartition, Long],
|
||||
private class ReplicaBuffer(expectedReplicasPerTopicPartition: collection.Map[TopicPartition, Int],
|
||||
initialOffsets: collection.Map[TopicPartition, Long],
|
||||
expectedNumFetchers: Int,
|
||||
reportInterval: Long) extends Logging {
|
||||
private val fetchOffsetMap = new Pool[TopicPartition, Long]
|
||||
|
@ -358,8 +358,8 @@ private class ReplicaBuffer(expectedReplicasPerTopicPartition: Map[TopicPartitio
|
|||
if (isMessageInAllReplicas) {
|
||||
val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset
|
||||
fetchOffsetMap.put(topicPartition, nextOffset)
|
||||
debug(expectedReplicasPerTopicPartition(topicPartition) + " replicas match at offset " +
|
||||
nextOffset + " for " + topicPartition)
|
||||
debug(s"${expectedReplicasPerTopicPartition(topicPartition)} replicas match at offset " +
|
||||
s"$nextOffset for $topicPartition")
|
||||
}
|
||||
}
|
||||
if (maxHw - fetchOffsetMap.get(topicPartition) > maxLag) {
|
||||
|
|
|
@ -240,7 +240,7 @@ object CoreUtils {
|
|||
/**
|
||||
* Returns a list of duplicated items
|
||||
*/
|
||||
def duplicates[T](s: Traversable[T]): Iterable[T] = {
|
||||
def duplicates[T](s: Iterable[T]): Iterable[T] = {
|
||||
s.groupBy(identity)
|
||||
.map { case (k, l) => (k, l.size)}
|
||||
.filter { case (_, l) => l > 1 }
|
||||
|
|
|
@ -20,7 +20,7 @@ package kafka.utils
|
|||
import java.util
|
||||
import java.util.Properties
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
/**
|
||||
* In order to have these implicits in scope, add the following import:
|
||||
|
|
|
@ -63,7 +63,7 @@ object Json {
|
|||
catch { case _: JsonProcessingException => None }
|
||||
|
||||
def tryParseBytes(input: Array[Byte]): Either[JsonProcessingException, JsonValue] =
|
||||
try Right(mapper.readTree(input)).right.map(JsonValue(_))
|
||||
try Right(mapper.readTree(input)).map(JsonValue(_))
|
||||
catch { case e: JsonProcessingException => Left(e) }
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.util.Locale
|
|||
import org.apache.log4j.{Level, LogManager, Logger}
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
|
||||
object Log4jController {
|
||||
|
|
|
@ -21,8 +21,8 @@ import java.util.concurrent._
|
|||
|
||||
import org.apache.kafka.common.KafkaException
|
||||
|
||||
import collection.mutable
|
||||
import collection.JavaConverters._
|
||||
import collection.Set
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
|
||||
|
||||
|
@ -69,7 +69,7 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
|
|||
|
||||
def remove(key: K, value: V): Boolean = pool.remove(key, value)
|
||||
|
||||
def keys: mutable.Set[K] = pool.keySet.asScala
|
||||
def keys: Set[K] = pool.keySet.asScala
|
||||
|
||||
def values: Iterable[V] = pool.values.asScala
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.util.Properties
|
|||
import java.util.Collections
|
||||
import scala.collection._
|
||||
import kafka.message.{CompressionCodec, NoCompressionCodec}
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
|
||||
class VerifiableProperties(val props: Properties) extends Logging {
|
||||
|
|
|
@ -19,7 +19,7 @@ package kafka.utils.json
|
|||
|
||||
import scala.collection.{Map, Seq}
|
||||
import scala.collection.compat._
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
|
||||
|
||||
|
@ -82,7 +82,7 @@ object DecodeJson {
|
|||
|
||||
implicit def decodeOption[E](implicit decodeJson: DecodeJson[E]): DecodeJson[Option[E]] = (node: JsonNode) => {
|
||||
if (node.isNull) Right(None)
|
||||
else decodeJson.decodeEither(node).right.map(Some(_))
|
||||
else decodeJson.decodeEither(node).map(Some(_))
|
||||
}
|
||||
|
||||
implicit def decodeSeq[E, S[+T] <: Seq[E]](implicit decodeJson: DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = (node: JsonNode) => {
|
||||
|
@ -93,7 +93,7 @@ object DecodeJson {
|
|||
|
||||
implicit def decodeMap[V, M[K, +V] <: Map[K, V]](implicit decodeJson: DecodeJson[V], factory: Factory[(String, V), M[String, V]]): DecodeJson[M[String, V]] = (node: JsonNode) => {
|
||||
if (node.isObject)
|
||||
decodeIterator(node.fields.asScala)(e => decodeJson.decodeEither(e.getValue).right.map(v => (e.getKey, v)))
|
||||
decodeIterator(node.fields.asScala)(e => decodeJson.decodeEither(e.getValue).map(v => (e.getKey, v)))
|
||||
else Left(s"Expected JSON object, received $node")
|
||||
}
|
||||
|
||||
|
|
|
@ -17,8 +17,8 @@
|
|||
|
||||
package kafka.utils.json
|
||||
|
||||
import scala.collection.{Iterator, JavaConverters}
|
||||
import JavaConverters._
|
||||
import scala.collection.Iterator
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ package kafka.utils.json
|
|||
|
||||
import com.fasterxml.jackson.databind.JsonMappingException
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode
|
||||
|
||||
|
|
|
@ -93,7 +93,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
|
|||
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
|
||||
|
||||
// create the partition assignment
|
||||
writeTopicPartitionAssignment(topic, partitionReplicaAssignment.mapValues(ReplicaAssignment(_)).toMap, isUpdate = false)
|
||||
writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) },
|
||||
isUpdate = false)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -140,7 +141,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
|
|||
val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap
|
||||
|
||||
if (!isUpdate) {
|
||||
zkClient.createTopicAssignment(topic, assignment.mapValues(_.replicas).toMap)
|
||||
zkClient.createTopicAssignment(topic, assignment.map { case (k, v) => k -> v.replicas })
|
||||
} else {
|
||||
zkClient.setTopicAssignment(topic, assignment)
|
||||
}
|
||||
|
@ -218,7 +219,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
|
|||
|
||||
writeTopicPartitionAssignment(topic, proposedAssignment, isUpdate = true)
|
||||
}
|
||||
proposedAssignment.mapValues(_.replicas).toMap
|
||||
proposedAssignment.map { case (k, v) => k -> v.replicas }
|
||||
}
|
||||
|
||||
private def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue