MINOR: Make the build compile with Scala 2.13 (#6989)

Scala 2.13 support was added to build via #5454. This PR adjusts the code so that
it compiles with 2.11, 2.12 and 2.13.

Changes:
* Add `scala-collection-compat` dependency.
* Import `scala.collection.Seq` in a number of places for consistent behavior between
Scala 2.11, 2.12 and 2.13.
* Remove wildcard imports that were causing the Java classes to have priority over the
Scala ones, related Scala issue: https://github.com/scala/scala/pull/6589.
* Replace parallel collection usage with `Future`. The former is no longer included by
default in the standard library.
* Replace val _: Unit workaround with one that is more concise and works with Scala 2.13
* Replace `filterKeys` with `filter` when we expect a `Map`. `filterKeys` returns a view
that doesn't implement the `Map` trait in Scala 2.13.
* Replace `mapValues` with `map` or add a `toMap` as an additional transformation
when we expect a `Map`. `mapValues` returns a view that doesn't implement the
`Map` trait in Scala 2.13.
* Replace `breakOut` with `iterator` and `to`, `breakOut` was removed in Scala
2.13.
* Replace to() with toMap, toIndexedSeq and toSet
* Replace `mutable.Buffer.--` with `filterNot`.
* ControlException is an abstract class in Scala 2.13.
* Variable arguments can only receive arrays or immutable.Seq in Scala 2.13.
* Use `Factory` instead of `CanBuildFrom` in DecodeJson. `CanBuildFrom` behaves
a bit differently in Scala 2.13 and it's been deprecated. `Factory` has the behavior
we need and it's available via the compat library.
* Fix failing tests due to behavior change in Scala 2.13,
"Map.values.map is not strict in Scala 2.13" (https://github.com/scala/bug/issues/11589).
* Use Java collections instead of Scala ones in StreamResetter (a Java class).
* Adjust CheckpointFile.write to take an `Iterable` instead of `Seq` to avoid
unnecessary collection copies.
* Fix DelayedElectLeader to use a Map instead of Set and avoid `to` call that
doesn't work in Scala 2.13.
* Use unordered map for mapping in SimpleAclAuthorizer, mapping of ordered
maps require an `Ordering` in Scala 2.13 for safety reasons.
* Adapt `ConsumerGroupCommand` to compile with Scala 2.13.
* CoreUtils.min takes an `Iterable` instead of `TraversableOnce`, the latter does
not exist in Scala 2.13.
* Replace `Unit` with `()` in a couple places. Scala 2.13 is stricter when it expects
a value instead of a type.
* Fix bug in CustomQuotaCallbackTest where we did not necessarily set `partitionRatio`
correctly, `forall` can terminate early.
* Add a couple of spotbugs exclusions that are needed by code generated by Scala 2.13
* Remove unused variables, simplify some code and remove procedure syntax in a few
places.
* Remove unused `CoreUtils.JSONEscapeString`.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
This commit is contained in:
Ismael Juma 2019-07-02 06:29:39 -07:00 committed by GitHub
parent da07985b01
commit 6dd4ebcea7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
125 changed files with 412 additions and 331 deletions

View File

@ -667,6 +667,7 @@ project(':core') {
compile libs.jacksonJDK8Datatypes compile libs.jacksonJDK8Datatypes
compile libs.joptSimple compile libs.joptSimple
compile libs.metrics compile libs.metrics
compile libs.scalaCollectionCompat
compile libs.scalaJava8Compat compile libs.scalaJava8Compat
compile libs.scalaLibrary compile libs.scalaLibrary
// only needed transitively, but set it explicitly to ensure it has the same version as scala-library // only needed transitively, but set it explicitly to ensure it has the same version as scala-library

View File

@ -194,7 +194,7 @@ object ConfigCommand extends Config {
*/ */
private def preProcessBrokerConfigs(configsToBeAdded: Properties, perBrokerConfig: Boolean) { private def preProcessBrokerConfigs(configsToBeAdded: Properties, perBrokerConfig: Boolean) {
val passwordEncoderConfigs = new Properties val passwordEncoderConfigs = new Properties
passwordEncoderConfigs ++= configsToBeAdded.asScala.filterKeys(_.startsWith("password.encoder.")) passwordEncoderConfigs ++= configsToBeAdded.asScala.filter { case (key, _) => key.startsWith("password.encoder.") }
if (!passwordEncoderConfigs.isEmpty) { if (!passwordEncoderConfigs.isEmpty) {
info(s"Password encoder configs ${passwordEncoderConfigs.keySet} will be used for encrypting" + info(s"Password encoder configs ${passwordEncoderConfigs.keySet} will be used for encrypting" +
" passwords, but will not be stored in ZooKeeper.") " passwords, but will not be stored in ZooKeeper.")

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, Set, mutable} import scala.collection.{immutable, Map, Seq, Set, mutable}
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
import joptsimple.OptionSpec import joptsimple.OptionSpec
import scala.collection.immutable.TreeMap import scala.collection.immutable.TreeMap
@ -374,7 +374,7 @@ object ConsumerGroupCommand extends Logging {
).describedGroups() ).describedGroups()
val result = val result =
consumerGroups.asScala.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) { consumerGroups.asScala.foldLeft(immutable.Map[String, Map[TopicPartition, OffsetAndMetadata]]()) {
case (acc, (groupId, groupDescription)) => case (acc, (groupId, groupDescription)) =>
groupDescription.get.state().toString match { groupDescription.get.state().toString match {
case "Empty" | "Dead" => case "Empty" | "Dead" =>
@ -418,7 +418,7 @@ object ConsumerGroupCommand extends Logging {
val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield { val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
val state = consumerGroup.state val state = consumerGroup.state
val committedOffsets = getCommittedOffsets(groupId).asScala.toMap val committedOffsets = getCommittedOffsets(groupId)
var assignedTopicPartitions = ListBuffer[TopicPartition]() var assignedTopicPartitions = ListBuffer[TopicPartition]()
val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
.sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary => .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>
@ -441,7 +441,7 @@ object ConsumerGroupCommand extends Logging {
Map(topicPartition -> Some(offset.offset)), Map(topicPartition -> Some(offset.offset)),
Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE)) Some(MISSING_COLUMN_VALUE)).toSeq
} }
groupId -> (Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer)) groupId -> (Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))
}).toMap }).toMap
@ -576,8 +576,7 @@ object ConsumerGroupCommand extends Logging {
private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = { private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = {
if (opts.options.has(opts.allTopicsOpt)) { if (opts.options.has(opts.allTopicsOpt)) {
val allTopicPartitions = getCommittedOffsets(groupId).keySet().asScala.toSeq getCommittedOffsets(groupId).keys.toSeq
allTopicPartitions
} else if (opts.options.has(opts.topicOpt)) { } else if (opts.options.has(opts.topicOpt)) {
val topics = opts.options.valuesOf(opts.topicOpt).asScala val topics = opts.options.valuesOf(opts.topicOpt).asScala
parseTopicPartitionsToReset(groupId, topics) parseTopicPartitionsToReset(groupId, topics)
@ -589,19 +588,19 @@ object ConsumerGroupCommand extends Logging {
} }
} }
private def getCommittedOffsets(groupId: String) = { private def getCommittedOffsets(groupId: String): Map[TopicPartition, OffsetAndMetadata] = {
adminClient.listConsumerGroupOffsets( adminClient.listConsumerGroupOffsets(
groupId, groupId,
withTimeoutMs(new ListConsumerGroupOffsetsOptions) withTimeoutMs(new ListConsumerGroupOffsetsOptions)
).partitionsToOffsetAndMetadata.get ).partitionsToOffsetAndMetadata.get.asScala
} }
type GroupMetadata = Map[String, Map[TopicPartition, OffsetAndMetadata]] type GroupMetadata = immutable.Map[String, immutable.Map[TopicPartition, OffsetAndMetadata]]
private def parseResetPlan(resetPlanCsv: String): GroupMetadata = { private def parseResetPlan(resetPlanCsv: String): GroupMetadata = {
def updateGroupMetadata(group: String, topic: String, partition: Int, offset: Long, acc: GroupMetadata) = { def updateGroupMetadata(group: String, topic: String, partition: Int, offset: Long, acc: GroupMetadata) = {
val topicPartition = new TopicPartition(topic, partition) val topicPartition = new TopicPartition(topic, partition)
val offsetAndMetadata = new OffsetAndMetadata(offset) val offsetAndMetadata = new OffsetAndMetadata(offset)
val dataMap = acc.getOrElse(group, Map()).updated(topicPartition, offsetAndMetadata) val dataMap = acc.getOrElse(group, immutable.Map()).updated(topicPartition, offsetAndMetadata)
acc.updated(group, dataMap) acc.updated(group, dataMap)
} }
val csvReader = CsvUtils().readerFor[CsvRecordNoGroup] val csvReader = CsvUtils().readerFor[CsvRecordNoGroup]
@ -612,14 +611,14 @@ object ConsumerGroupCommand extends Logging {
// Single group CSV format: "topic,partition,offset" // Single group CSV format: "topic,partition,offset"
val dataMap = if (isSingleGroupQuery && isOldCsvFormat) { val dataMap = if (isSingleGroupQuery && isOldCsvFormat) {
val group = opts.options.valueOf(opts.groupOpt) val group = opts.options.valueOf(opts.groupOpt)
lines.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) => lines.foldLeft(immutable.Map[String, immutable.Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) =>
val CsvRecordNoGroup(topic, partition, offset) = csvReader.readValue[CsvRecordNoGroup](line) val CsvRecordNoGroup(topic, partition, offset) = csvReader.readValue[CsvRecordNoGroup](line)
updateGroupMetadata(group, topic, partition, offset, acc) updateGroupMetadata(group, topic, partition, offset, acc)
} }
// Multiple group CSV format: "group,topic,partition,offset" // Multiple group CSV format: "group,topic,partition,offset"
} else { } else {
val csvReader = CsvUtils().readerFor[CsvRecordWithGroup] val csvReader = CsvUtils().readerFor[CsvRecordWithGroup]
lines.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) => lines.foldLeft(immutable.Map[String, immutable.Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) =>
val CsvRecordWithGroup(group, topic, partition, offset) = csvReader.readValue[CsvRecordWithGroup](line) val CsvRecordWithGroup(group, topic, partition, offset) = csvReader.readValue[CsvRecordWithGroup](line)
updateGroupMetadata(group, topic, partition, offset, acc) updateGroupMetadata(group, topic, partition, offset, acc)
} }
@ -654,7 +653,7 @@ object ConsumerGroupCommand extends Logging {
val currentCommittedOffsets = getCommittedOffsets(groupId) val currentCommittedOffsets = getCommittedOffsets(groupId)
val requestedOffsets = partitionsToReset.map { topicPartition => val requestedOffsets = partitionsToReset.map { topicPartition =>
val shiftBy = opts.options.valueOf(opts.resetShiftByOpt) val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
val currentOffset = currentCommittedOffsets.asScala.getOrElse(topicPartition, val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset")).offset throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset")).offset
(topicPartition, currentOffset + shiftBy) (topicPartition, currentOffset + shiftBy)
}.toMap }.toMap
@ -706,8 +705,8 @@ object ConsumerGroupCommand extends Logging {
val preparedOffsetsForPartitionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map { topicPartition => val preparedOffsetsForPartitionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map { topicPartition =>
(topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition) match { (topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition) match {
case offset if offset != null => offset.offset case Some(offset) => offset.offset
case _ => throw new IllegalStateException(s"Expected a valid current offset for topic partition: $topicPartition") case None => throw new IllegalStateException(s"Expected a valid current offset for topic partition: $topicPartition")
})) }))
}.toMap }.toMap
@ -756,7 +755,7 @@ object ConsumerGroupCommand extends Logging {
if (isSingleGroupQuery) CsvRecordNoGroup(k.topic, k.partition, v.offset) if (isSingleGroupQuery) CsvRecordNoGroup(k.topic, k.partition, v.offset)
else CsvRecordWithGroup(groupId, k.topic, k.partition, v.offset) else CsvRecordWithGroup(groupId, k.topic, k.partition, v.offset)
csvWriter.writeValueAsString(csvRecord) csvWriter.writeValueAsString(csvRecord)
}(collection.breakOut): List[String] }
} }
rows.mkString("") rows.mkString("")
} }

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Seq
/** /**
* A command for delete records of the given partitions down to the specified offset. * A command for delete records of the given partitions down to the specified offset.

View File

@ -148,8 +148,8 @@ object LeaderElectionCommand extends Logging {
val noop = mutable.Set.empty[TopicPartition] val noop = mutable.Set.empty[TopicPartition]
val failed = mutable.Map.empty[TopicPartition, Throwable] val failed = mutable.Map.empty[TopicPartition, Throwable]
electionResults.foreach { case (topicPartition, error) => electionResults.foreach[Unit] { case (topicPartition, error) =>
val _: Unit = if (error.isPresent) { if (error.isPresent) {
error.get match { error.get match {
case _: ElectionNotNeededException => noop += topicPartition case _: ElectionNotNeededException => noop += topicPartition
case _ => failed += topicPartition -> error.get case _ => failed += topicPartition -> error.get

View File

@ -48,7 +48,7 @@ object LogDirsCommand {
out.println("Querying brokers for log directories information") out.println("Querying brokers for log directories information")
val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.mapValues(_.asScala) val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.mapValues(_.asScala).toMap
out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}") out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
out.println(formatAsJson(logDirInfosByBroker, topicList.toSet)) out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))

View File

@ -243,8 +243,8 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val noop = mutable.Set.empty[TopicPartition] val noop = mutable.Set.empty[TopicPartition]
val failed = mutable.Map.empty[TopicPartition, Throwable] val failed = mutable.Map.empty[TopicPartition, Throwable]
electionResults.foreach { case (topicPartition, error) => electionResults.foreach[Unit] { case (topicPartition, error) =>
val _: Unit = if (error.isPresent) { if (error.isPresent) {
if (error.get.isInstanceOf[ElectionNotNeededException]) { if (error.get.isInstanceOf[ElectionNotNeededException]) {
noop += topicPartition noop += topicPartition
} else { } else {

View File

@ -640,7 +640,8 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
val replicasAssignedToFutureDir = mutable.Set.empty[TopicPartitionReplica] val replicasAssignedToFutureDir = mutable.Set.empty[TopicPartitionReplica]
while (remainingTimeMs > 0 && replicasAssignedToFutureDir.size < proposedReplicaAssignment.size) { while (remainingTimeMs > 0 && replicasAssignedToFutureDir.size < proposedReplicaAssignment.size) {
replicasAssignedToFutureDir ++= alterReplicaLogDirsIgnoreReplicaNotAvailable( replicasAssignedToFutureDir ++= alterReplicaLogDirsIgnoreReplicaNotAvailable(
proposedReplicaAssignment.filterKeys(replica => !replicasAssignedToFutureDir.contains(replica)), adminClientOpt.get, remainingTimeMs) proposedReplicaAssignment.filter { case (replica, _) => !replicasAssignedToFutureDir.contains(replica) },
adminClientOpt.get, remainingTimeMs)
Thread.sleep(100) Thread.sleep(100)
remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis() remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis()
} }

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.ElectLeadersRequest import org.apache.kafka.common.requests.ElectLeadersRequest
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.breakOut
package object api { package object api {
implicit final class ElectLeadersRequestOps(val self: ElectLeadersRequest) extends AnyVal { implicit final class ElectLeadersRequestOps(val self: ElectLeadersRequest) extends AnyVal {
@ -28,11 +27,11 @@ package object api {
if (self.data.topicPartitions == null) { if (self.data.topicPartitions == null) {
Set.empty Set.empty
} else { } else {
self.data.topicPartitions.asScala.flatMap { topicPartition => self.data.topicPartitions.asScala.iterator.flatMap { topicPartition =>
topicPartition.partitionId.asScala.map { partitionId => topicPartition.partitionId.asScala.map { partitionId =>
new TopicPartition(topicPartition.topic, partitionId) new TopicPartition(topicPartition.topic, partitionId)
} }
}(breakOut) }.toSet
} }
} }

View File

@ -22,6 +22,8 @@ import org.apache.kafka.common.Node
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import scala.collection.Seq
/** /**
* A Kafka broker. * A Kafka broker.
* A broker has an id, a collection of end-points, an optional rack and a listener to security protocol map. * A broker has an id, a collection of end-points, an optional rack and a listener to security protocol map.

View File

@ -41,7 +41,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Map import scala.collection.{Map, Seq}
trait PartitionStateStore { trait PartitionStateStore {
def fetchTopicConfig(): Properties def fetchTopicConfig(): Properties

View File

@ -25,6 +25,7 @@ import kafka.zk.{KafkaZkClient, StateChangeHandlers}
import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler} import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import scala.collection.Seq
import scala.util.{Failure, Try} import scala.util.{Failure, Try}
/** /**

View File

@ -38,7 +38,7 @@ import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicParti
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer} import scala.collection.mutable.{HashMap, ListBuffer}
import scala.collection.{Set, mutable} import scala.collection.{Seq, Set, mutable}
object ControllerChannelManager { object ControllerChannelManager {
val QueueSizeMetricName = "QueueSize" val QueueSizeMetricName = "QueueSize"

View File

@ -20,7 +20,7 @@ package kafka.controller
import kafka.cluster.Broker import kafka.cluster.Broker
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import scala.collection.{Seq, Set, mutable} import scala.collection.{Map, Seq, Set, mutable}
class ControllerContext { class ControllerContext {
val stats = new ControllerStats val stats = new ControllerStats
@ -113,7 +113,7 @@ class ControllerContext {
def removeLiveBrokers(brokerIds: Set[Int]): Unit = { def removeLiveBrokers(brokerIds: Set[Int]): Unit = {
liveBrokers = liveBrokers.filter(broker => !brokerIds.contains(broker.id)) liveBrokers = liveBrokers.filter(broker => !brokerIds.contains(broker.id))
liveBrokerEpochs = liveBrokerEpochs.filterKeys(id => !brokerIds.contains(id)) liveBrokerEpochs = liveBrokerEpochs.filter { case (id, _) => !brokerIds.contains(id) }
} }
def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = { def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {

View File

@ -19,6 +19,8 @@ package kafka.controller
import kafka.api.LeaderAndIsr import kafka.api.LeaderAndIsr
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import scala.collection.Seq
case class ElectionResult(topicPartition: TopicPartition, leaderAndIsr: Option[LeaderAndIsr], liveReplicas: Seq[Int]) case class ElectionResult(topicPartition: TopicPartition, leaderAndIsr: Option[LeaderAndIsr], liveReplicas: Seq[Int])
object Election { object Election {

View File

@ -41,7 +41,7 @@ import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.KeeperException.Code
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection._ import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.util.{Failure, Try} import scala.util.{Failure, Try}
sealed trait ElectionTrigger sealed trait ElectionTrigger
@ -897,7 +897,7 @@ class KafkaController(val config: KafkaConfig,
// Ensure we detect future reassignments // Ensure we detect future reassignments
eventManager.put(PartitionReassignment) eventManager.put(PartitionReassignment)
} else { } else {
val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas) val reassignment = updatedPartitionsBeingReassigned.map { case (k, v) => k -> v.newReplicas }
try zkClient.setOrCreatePartitionReassignment(reassignment, controllerContext.epochZkVersion) try zkClient.setOrCreatePartitionReassignment(reassignment, controllerContext.epochZkVersion)
catch { catch {
case e: KeeperException => throw new AdminOperationException(e) case e: KeeperException => throw new AdminOperationException(e)
@ -1276,8 +1276,8 @@ class KafkaController(val config: KafkaConfig,
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds) val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
.filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId)) .filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
val newBrokerAndEpochs = curBrokerAndEpochs.filterKeys(broker => newBrokerIds.contains(broker.id)) val newBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => newBrokerIds.contains(broker.id) }
val bouncedBrokerAndEpochs = curBrokerAndEpochs.filterKeys(broker => bouncedBrokerIds.contains(broker.id)) val bouncedBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => bouncedBrokerIds.contains(broker.id) }
val newBrokerIdsSorted = newBrokerIds.toSeq.sorted val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
@ -1358,7 +1358,7 @@ class KafkaController(val config: KafkaConfig,
} }
private def processPartitionModifications(topic: String): Unit = { private def processPartitionModifications(topic: String): Unit = {
def restorePartitionReplicaAssignment(topic: String, newPartitionReplicaAssignment : immutable.Map[TopicPartition, Seq[Int]]): Unit = { def restorePartitionReplicaAssignment(topic: String, newPartitionReplicaAssignment: Map[TopicPartition, Seq[Int]]): Unit = {
info("Restoring the partition replica assignment for topic %s".format(topic)) info("Restoring the partition replica assignment for topic %s".format(topic))
val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic)) val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
@ -1505,7 +1505,7 @@ class KafkaController(val config: KafkaConfig,
): Unit = { ): Unit = {
callback( callback(
partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions => partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions =>
partitions.map(partition => partition -> Left(new ApiError(Errors.NOT_CONTROLLER, null)))(breakOut) partitions.iterator.map(partition => partition -> Left(new ApiError(Errors.NOT_CONTROLLER, null))).toMap
} }
) )
} }
@ -1518,7 +1518,7 @@ class KafkaController(val config: KafkaConfig,
): Unit = { ): Unit = {
if (!isActive) { if (!isActive) {
callback(partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions => callback(partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions =>
partitions.map(partition => partition -> Left(new ApiError(Errors.NOT_CONTROLLER, null)))(breakOut) partitions.iterator.map(partition => partition -> Left(new ApiError(Errors.NOT_CONTROLLER, null))).toMap
}) })
} else { } else {
// We need to register the watcher if the path doesn't exist in order to detect future preferred replica // We need to register the watcher if the path doesn't exist in order to detect future preferred replica
@ -1556,19 +1556,19 @@ class KafkaController(val config: KafkaConfig,
} }
} }
val results = onReplicaElection(electablePartitions, electionType, electionTrigger).mapValues { val results = onReplicaElection(electablePartitions, electionType, electionTrigger).map {
case Left(ex) => case (k, Left(ex)) =>
if (ex.isInstanceOf[StateChangeFailedException]) { if (ex.isInstanceOf[StateChangeFailedException]) {
val error = if (electionType == ElectionType.PREFERRED) { val error = if (electionType == ElectionType.PREFERRED) {
Errors.PREFERRED_LEADER_NOT_AVAILABLE Errors.PREFERRED_LEADER_NOT_AVAILABLE
} else { } else {
Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE
} }
Left(new ApiError(error, ex.getMessage)) k -> Left(new ApiError(error, ex.getMessage))
} else { } else {
Left(ApiError.fromThrowable(ex)) k -> Left(ApiError.fromThrowable(ex))
} }
case Right(leaderAndIsr) => Right(leaderAndIsr.leader) case (k, Right(leaderAndIsr)) => k -> Right(leaderAndIsr.leader)
} ++ } ++
alreadyValidLeader.map(_ -> Left(new ApiError(Errors.ELECTION_NOT_NEEDED))) ++ alreadyValidLeader.map(_ -> Left(new ApiError(Errors.ELECTION_NOT_NEEDED))) ++
partitionsBeingDeleted.map( partitionsBeingDeleted.map(

View File

@ -28,8 +28,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ControllerMovedException import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.KeeperException.Code
import scala.collection.breakOut import scala.collection.{Map, Seq, mutable}
import scala.collection.mutable
abstract class PartitionStateMachine(controllerContext: ControllerContext) extends Logging { abstract class PartitionStateMachine(controllerContext: ControllerContext) extends Logging {
/** /**
@ -165,7 +164,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
throw e throw e
case e: Throwable => case e: Throwable =>
error(s"Error while moving some partitions to $targetState state", e) error(s"Error while moving some partitions to $targetState state", e)
partitions.map(_ -> Left(e))(breakOut) partitions.iterator.map(_ -> Left(e)).toMap
} }
} else { } else {
Map.empty Map.empty
@ -368,7 +367,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
zkClient.getTopicPartitionStatesRaw(partitions) zkClient.getTopicPartitionStatesRaw(partitions)
} catch { } catch {
case e: Exception => case e: Exception =>
return (partitions.map(_ -> Left(e))(breakOut), Seq.empty) return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
} }
val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]] val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)] val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
@ -470,7 +469,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
} }
} else { } else {
val (logConfigs, failed) = zkClient.getLogConfigs( val (logConfigs, failed) = zkClient.getLogConfigs(
partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }(breakOut), partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet,
config.originals() config.originals()
) )

View File

@ -26,8 +26,7 @@ import kafka.zk.TopicPartitionStateZNode
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ControllerMovedException import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.KeeperException.Code
import scala.collection.breakOut import scala.collection.{Seq, mutable}
import scala.collection.mutable
abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends Logging { abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends Logging {
/** /**
@ -337,7 +336,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
) )
val exceptionsForPartitionsWithNoLeaderAndIsrInZk: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] = val exceptionsForPartitionsWithNoLeaderAndIsrInZk: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
partitionsWithNoLeaderAndIsrInZk.flatMap { partition => partitionsWithNoLeaderAndIsrInZk.iterator.flatMap { partition =>
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) { if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
val exception = new StateChangeFailedException( val exception = new StateChangeFailedException(
s"Failed to change state of replica $replicaId for partition $partition since the leader and isr " + s"Failed to change state of replica $replicaId for partition $partition since the leader and isr " +
@ -345,7 +344,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
) )
Option(partition -> Left(exception)) Option(partition -> Left(exception))
} else None } else None
}(breakOut) }.toMap
val leaderIsrAndControllerEpochs: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] = val leaderIsrAndControllerEpochs: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
(leaderAndIsrsWithoutReplica ++ finishedPartitions).map { case (partition, result: Either[Exception, LeaderAndIsr]) => (leaderAndIsrsWithoutReplica ++ finishedPartitions).map { case (partition, result: Either[Exception, LeaderAndIsr]) =>
@ -381,15 +380,15 @@ class ZkReplicaStateMachine(config: KafkaConfig,
zkClient.getTopicPartitionStatesRaw(partitions) zkClient.getTopicPartitionStatesRaw(partitions)
} catch { } catch {
case e: Exception => case e: Exception =>
return (partitions.map(_ -> Left(e))(breakOut), Seq.empty) return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
} }
val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicPartition] val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicPartition]
val result = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]] val result = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
getDataResponses.foreach { getDataResponse => getDataResponses.foreach[Unit] { getDataResponse =>
val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition] val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
val _: Unit = if (getDataResponse.resultCode == Code.OK) { if (getDataResponse.resultCode == Code.OK) {
TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match { TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
case None => case None =>
partitionsWithNoLeaderAndIsrInZk += partition partitionsWithNoLeaderAndIsrInZk += partition

View File

@ -557,7 +557,7 @@ class GroupCoordinator(val brokerId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match { validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error)) case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
case None => case None =>
val group = groupManager.getGroup(groupId).getOrElse { val group = groupManager.getGroup(groupId).getOrElse {
groupManager.addGroup(new GroupMetadata(groupId, Empty, time)) groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
@ -573,7 +573,7 @@ class GroupCoordinator(val brokerId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit) { responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match { validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error)) case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
case None => case None =>
groupManager.getGroup(groupId) match { groupManager.getGroup(groupId) match {
case None => case None =>
@ -584,7 +584,7 @@ class GroupCoordinator(val brokerId: Int,
offsetMetadata, responseCallback) offsetMetadata, responseCallback)
} else { } else {
// or this is a request coming from an older generation. either way, reject the commit // or this is a request coming from an older generation. either way, reject the commit
responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION)) responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
} }
case Some(group) => case Some(group) =>
@ -616,17 +616,17 @@ class GroupCoordinator(val brokerId: Int,
// from the coordinator metadata; it is likely that the group has migrated to some other // from the coordinator metadata; it is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry // coordinator OR the group is in a transient unstable phase. Let the member retry
// finding the correct coordinator and rejoin. // finding the correct coordinator and rejoin.
responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE)) responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE })
} else if (group.isStaticMemberFenced(memberId, groupInstanceId)) { } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.FENCED_INSTANCE_ID)) responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID })
} else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) { } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
// The group is only using Kafka to store offsets. // The group is only using Kafka to store offsets.
// Also, for transactional offset commits we don't need to validate group membership and the generation. // Also, for transactional offset commits we don't need to validate group membership and the generation.
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch) groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
} else if (!group.has(memberId)) { } else if (!group.has(memberId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID)) responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.UNKNOWN_MEMBER_ID })
} else if (generationId != group.generationId) { } else if (generationId != group.generationId) {
responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION)) responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
} else { } else {
group.currentState match { group.currentState match {
case Stable | PreparingRebalance => case Stable | PreparingRebalance =>
@ -641,7 +641,7 @@ class GroupCoordinator(val brokerId: Int,
// but since the consumer's member.id and generation is valid, it means it has received // but since the consumer's member.id and generation is valid, it means it has received
// the latest group generation information from the JoinResponse. // the latest group generation information from the JoinResponse.
// So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully. // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully.
responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS)) responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.REBALANCE_IN_PROGRESS })
case _ => case _ =>
throw new RuntimeException(s"Logic error: unexpected group state ${group.currentState}") throw new RuntimeException(s"Logic error: unexpected group state ${group.currentState}")

View File

@ -46,7 +46,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection._ import scala.collection._
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ArrayBuffer
class GroupMetadataManager(brokerId: Int, class GroupMetadataManager(brokerId: Int,
interBrokerProtocolVersion: ApiVersion, interBrokerProtocolVersion: ApiVersion,
@ -312,7 +312,7 @@ class GroupMetadataManager(brokerId: Int,
// construct the message set to append // construct the message set to append
if (filteredOffsetMetadata.isEmpty) { if (filteredOffsetMetadata.isEmpty) {
// compute the final error codes for the commit response // compute the final error codes for the commit response
val commitStatus = offsetMetadata.mapValues(_ => Errors.OFFSET_METADATA_TOO_LARGE) val commitStatus = offsetMetadata.map { case (k, v) => k -> Errors.OFFSET_METADATA_TOO_LARGE }
responseCallback(commitStatus) responseCallback(commitStatus)
None None
} else { } else {
@ -757,7 +757,7 @@ class GroupMetadataManager(brokerId: Int,
val timestamp = time.milliseconds() val timestamp = time.milliseconds()
replicaManager.nonOfflinePartition(appendPartition).foreach { partition => replicaManager.nonOfflinePartition(appendPartition).foreach { partition =>
val tombstones = ListBuffer.empty[SimpleRecord] val tombstones = ArrayBuffer.empty[SimpleRecord]
removedOffsets.foreach { case (topicPartition, offsetAndMetadata) => removedOffsets.foreach { case (topicPartition, offsetAndMetadata) =>
trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata") trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition) val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
@ -780,7 +780,7 @@ class GroupMetadataManager(brokerId: Int,
try { try {
// do not need to require acks since even if the tombstone is lost, // do not need to require acks since even if the tombstone is lost,
// it will be appended again in the next purge cycle // it will be appended again in the next purge cycle
val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones: _*) val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones.toArray: _*)
partition.appendRecordsToLeader(records, isFromClient = false, requiredAcks = 0) partition.appendRecordsToLeader(records, isFromClient = false, requiredAcks = 0)
offsetsRemoved += removedOffsets.size offsetsRemoved += removedOffsets.size

View File

@ -36,7 +36,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.{Iterable, Set, mutable} import scala.collection.{Iterable, Seq, Set, mutable}
import scala.util.control.ControlThrowable import scala.util.control.ControlThrowable
/** /**

View File

@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.errors.KafkaStorageException
import scala.collection.{Iterable, immutable, mutable} import scala.collection.{Iterable, Seq, immutable, mutable}
private[log] sealed trait LogCleaningState private[log] sealed trait LogCleaningState
private[log] case object LogCleaningInProgress extends LogCleaningState private[log] case object LogCleaningInProgress extends LogCleaningState
@ -356,7 +356,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val checkpoint = checkpoints(dataDir) val checkpoint = checkpoints(dataDir)
if (checkpoint != null) { if (checkpoint != null) {
try { try {
val existing = checkpoint.read().filterKeys(logs.keys) ++ update val existing = checkpoint.read().filter { case (k, _) => logs.keys.contains(k) } ++ update
checkpoint.write(existing) checkpoint.write(existing)
} catch { } catch {
case e: KafkaStorageException => case e: KafkaStorageException =>
@ -393,7 +393,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
def handleLogDirFailure(dir: String) { def handleLogDirFailure(dir: String) {
info(s"Stopping cleaning logs in dir $dir") info(s"Stopping cleaning logs in dir $dir")
inLock(lock) { inLock(lock) {
checkpoints = checkpoints.filterKeys(_.getAbsolutePath != dir) checkpoints = checkpoints.filter { case (k, _) => k.getAbsolutePath != dir }
} }
} }

View File

@ -103,7 +103,7 @@ class LogManager(logDirs: Seq[File],
private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]() private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()
private def offlineLogDirs: Iterable[File] = { private def offlineLogDirs: Iterable[File] = {
val logDirsSet = mutable.Set[File](logDirs: _*) val logDirsSet = mutable.Set[File]() ++= logDirs
_liveLogDirs.asScala.foreach(logDirsSet -=) _liveLogDirs.asScala.foreach(logDirsSet -=)
logDirsSet logDirsSet
} }
@ -607,7 +607,7 @@ class LogManager(logDirs: Seq[File],
partitionToLog <- logsByDir.get(dir.getAbsolutePath) partitionToLog <- logsByDir.get(dir.getAbsolutePath)
checkpoint <- recoveryPointCheckpoints.get(dir) checkpoint <- recoveryPointCheckpoints.get(dir)
} { } {
checkpoint.write(partitionToLog.mapValues(_.recoveryPoint)) checkpoint.write(partitionToLog.map { case (tp, log) => tp -> log.recoveryPoint })
} }
} }
@ -620,9 +620,9 @@ class LogManager(logDirs: Seq[File],
checkpoint <- logStartOffsetCheckpoints.get(dir) checkpoint <- logStartOffsetCheckpoints.get(dir)
} { } {
try { try {
val logStartOffsets = partitionToLog.filter { case (_, log) => val logStartOffsets = partitionToLog.collect {
log.logStartOffset > log.logSegments.head.baseOffset case (k, log) if log.logStartOffset > log.logSegments.head.baseOffset => k -> log.logStartOffset
}.mapValues(_.logStartOffset) }
checkpoint.write(logStartOffsets) checkpoint.write(logStartOffsets)
} catch { } catch {
case e: IOException => case e: IOException =>

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCom
import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType, BufferSupplier} import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType, BufferSupplier}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import scala.collection.mutable import scala.collection.{Seq, mutable}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
private[kafka] object LogValidator extends Logging { private[kafka] object LogValidator extends Logging {

View File

@ -23,6 +23,7 @@ package kafka.metrics
import kafka.utils.{CoreUtils, VerifiableProperties} import kafka.utils.{CoreUtils, VerifiableProperties}
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.Seq
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer

View File

@ -221,10 +221,12 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = { override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = {
inReadLock(lock) { inReadLock(lock) {
aclCache.mapValues { versionedAcls => unorderedAcls.flatMap { case (k, versionedAcls) =>
versionedAcls.acls.filter(_.principal == principal) val aclsForPrincipal = versionedAcls.acls.filter(_.principal == principal)
}.filter { case (_, acls) => if (aclsForPrincipal.nonEmpty)
acls.nonEmpty Some(k -> aclsForPrincipal)
else
None
} }
} }
} }
@ -243,7 +245,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
.from(Resource(resourceType, resourceName, PatternType.PREFIXED)) .from(Resource(resourceType, resourceName, PatternType.PREFIXED))
.to(Resource(resourceType, resourceName.take(1), PatternType.PREFIXED)) .to(Resource(resourceType, resourceName.take(1), PatternType.PREFIXED))
.filterKeys(resource => resourceName.startsWith(resource.name)) .filterKeys(resource => resourceName.startsWith(resource.name))
.flatMap { case (resource, versionedAcls) => versionedAcls.acls } .values
.flatMap { _.acls }
.toSet .toSet
prefixed ++ wildcard ++ literal prefixed ++ wildcard ++ literal
@ -252,7 +255,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
override def getAcls(): Map[Resource, Set[Acl]] = { override def getAcls(): Map[Resource, Set[Acl]] = {
inReadLock(lock) { inReadLock(lock) {
aclCache.mapValues(_.acls) unorderedAcls.map { case (k, v) => k -> v.acls }
} }
} }
@ -356,6 +359,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
} }
} }
// Returns Map instead of SortedMap since most callers don't care about ordering. In Scala 2.13, mapping from SortedMap
// to Map is restricted by default
private def unorderedAcls: Map[Resource, VersionedAcls] = aclCache
private def getAclsFromCache(resource: Resource): VersionedAcls = { private def getAclsFromCache(resource: Resource): VersionedAcls = {
aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource")) aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
} }

View File

@ -31,7 +31,7 @@ import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import AbstractFetcherThread._ import AbstractFetcherThread._
import scala.collection.{Map, Set, mutable} import scala.collection.{Map, Seq, Set, mutable}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong

View File

@ -88,8 +88,8 @@ class AdminManager(val config: KafkaConfig,
val metadata = toCreate.values.map(topic => val metadata = toCreate.values.map(topic =>
try { try {
val configs = new Properties() val configs = new Properties()
topic.configs().asScala.foreach { case entry => topic.configs.asScala.foreach { entry =>
configs.setProperty(entry.name(), entry.value()) configs.setProperty(entry.name, entry.value)
} }
LogConfig.validate(configs) LogConfig.validate(configs)
@ -131,17 +131,9 @@ class AdminManager(val config: KafkaConfig,
val javaAssignments = if (topic.assignments().isEmpty) { val javaAssignments = if (topic.assignments().isEmpty) {
null null
} else { } else {
val map = new java.util.HashMap[Integer, java.util.List[Integer]] assignments.map { case (k, v) =>
assignments.foreach { (k: java.lang.Integer) -> v.map(i => i: java.lang.Integer).asJava
case (k, v) => { }.asJava
val list = new java.util.ArrayList[Integer]
v.foreach {
case i => list.add(Integer.valueOf(i))
}
map.put(k, list)
}
}
map
} }
val javaConfigs = new java.util.HashMap[String, String] val javaConfigs = new java.util.HashMap[String, String]
topic.configs().asScala.foreach(config => javaConfigs.put(config.name(), config.value())) topic.configs().asScala.foreach(config => javaConfigs.put(config.name(), config.value()))
@ -169,7 +161,7 @@ class AdminManager(val config: KafkaConfig,
case e: Throwable => case e: Throwable =>
error(s"Error processing create topic request $topic", e) error(s"Error processing create topic request $topic", e)
CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(e)) CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(e))
}) }).toIndexedSeq
// 2. if timeout <= 0, validateOnly or no topics can proceed return immediately // 2. if timeout <= 0, validateOnly or no topics can proceed return immediately
if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) { if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
@ -184,9 +176,8 @@ class AdminManager(val config: KafkaConfig,
responseCallback(results) responseCallback(results)
} else { } else {
// 3. else pass the assignments and errors to the delayed operation and set the keys // 3. else pass the assignments and errors to the delayed operation and set the keys
val delayedCreate = new DelayedCreatePartitions(timeout, metadata.toSeq, this, responseCallback) val delayedCreate = new DelayedCreatePartitions(timeout, metadata, this, responseCallback)
val delayedCreateKeys = toCreate.values.map( val delayedCreateKeys = toCreate.values.map(topic => new TopicKey(topic.name)).toIndexedSeq
topic => new TopicKey(topic.name())).toSeq
// try to complete the request immediately, otherwise put it into the purgatory // try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys) topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
} }

View File

@ -310,7 +310,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
} }
private def quotaLimit(metricTags: util.Map[String, String]): Double = { private def quotaLimit(metricTags: util.Map[String, String]): Double = {
Option(quotaCallback.quotaLimit(clientQuotaType, metricTags)).map(_.toDouble)getOrElse(Long.MaxValue) Option(quotaCallback.quotaLimit(clientQuotaType, metricTags)).map(_.toDouble).getOrElse(Long.MaxValue)
} }
/* /*

View File

@ -34,6 +34,7 @@ import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.common.utils.Sanitizer
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.util.Try import scala.util.Try
/** /**

View File

@ -115,7 +115,7 @@ class DelayedDeleteRecords(delayMs: Long,
* Upon completion, return the current response status along with the error code per partition * Upon completion, return the current response status along with the error code per partition
*/ */
override def onComplete() { override def onComplete() {
val responseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus) val responseStatus = deleteRecordsStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(responseStatus) responseCallback(responseStatus)
} }
} }

View File

@ -34,8 +34,8 @@ class DelayedElectLeader(
responseCallback: Map[TopicPartition, ApiError] => Unit responseCallback: Map[TopicPartition, ApiError] => Unit
) extends DelayedOperation(delayMs) { ) extends DelayedOperation(delayMs) {
var waitingPartitions = expectedLeaders private var waitingPartitions = expectedLeaders
val fullResults = results.to[mutable.Set] private val fullResults = mutable.Map() ++= results
/** /**
@ -50,8 +50,8 @@ class DelayedElectLeader(
override def onComplete(): Unit = { override def onComplete(): Unit = {
// This could be called to force complete, so I need the full list of partitions, so I can time them all out. // This could be called to force complete, so I need the full list of partitions, so I can time them all out.
updateWaiting() updateWaiting()
val timedout = waitingPartitions.map{ val timedout = waitingPartitions.map {
case (tp, leader) => tp -> new ApiError(Errors.REQUEST_TIMED_OUT, null) case (tp, _) => tp -> new ApiError(Errors.REQUEST_TIMED_OUT, null)
}.toMap }.toMap
responseCallback(timedout ++ fullResults) responseCallback(timedout ++ fullResults)
} }
@ -70,7 +70,7 @@ class DelayedElectLeader(
} }
private def updateWaiting() = { private def updateWaiting() = {
waitingPartitions.foreach{case (tp, leader) => waitingPartitions.foreach { case (tp, leader) =>
val ps = replicaManager.metadataCache.getPartitionInfo(tp.topic, tp.partition) val ps = replicaManager.metadataCache.getPartitionInfo(tp.topic, tp.partition)
ps match { ps match {
case Some(ps) => case Some(ps) =>

View File

@ -125,7 +125,7 @@ class DelayedProduce(delayMs: Long,
* Upon completion, return the current response status along with the error code per partition * Upon completion, return the current response status along with the error code per partition
*/ */
override def onComplete() { override def onComplete() {
val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(responseStatus) responseCallback(responseStatus)
} }
} }

View File

@ -732,7 +732,7 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
case reporter: Reconfigurable => dynamicConfig.maybeReconfigure(reporter, dynamicConfig.currentKafkaConfig, configs) case reporter: Reconfigurable => dynamicConfig.maybeReconfigure(reporter, dynamicConfig.currentKafkaConfig, configs)
case _ => case _ =>
} }
val added = updatedMetricsReporters -- currentReporters.keySet val added = updatedMetricsReporters.filterNot(currentReporters.keySet)
createReporters(added.asJava, configs) createReporters(added.asJava, configs)
} }
@ -844,9 +844,9 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
def validateReconfiguration(newConfig: KafkaConfig): Unit = { def validateReconfiguration(newConfig: KafkaConfig): Unit = {
def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = { def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = {
newConfig.originals.asScala newConfig.originals.asScala.filter { case (key, _) =>
.filterKeys(_.startsWith(prefix)) key.startsWith(prefix) && !DynamicSecurityConfigs.contains(key)
.filterKeys(k => !DynamicSecurityConfigs.contains(k)) }
} }
val oldConfig = server.config val oldConfig = server.config

View File

@ -83,7 +83,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection._ import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
@ -316,7 +316,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
// the callback for sending an offset commit response // the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) { def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]) {
val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
if (isDebugEnabled) if (isDebugEnabled)
combinedCommitStatus.foreach { case (topicPartition, error) => combinedCommitStatus.foreach { case (topicPartition, error) =>
@ -402,13 +402,13 @@ class KafkaApis(val requestChannel: RequestChannel,
// - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5. // - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5.
// - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect // - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
val currentTimestamp = time.milliseconds val currentTimestamp = time.milliseconds
val partitionData = authorizedTopicRequestInfo.mapValues { partitionData => val partitionData = authorizedTopicRequestInfo.map { case (k, partitionData) =>
val metadata = if (partitionData.committedMetadata() == null) val metadata = if (partitionData.committedMetadata() == null)
OffsetAndMetadata.NoMetadata OffsetAndMetadata.NoMetadata
else else
partitionData.committedMetadata() partitionData.committedMetadata()
new OffsetAndMetadata( k -> new OffsetAndMetadata(
offset = partitionData.committedOffset(), offset = partitionData.committedOffset(),
leaderEpoch = Optional.ofNullable[Integer](partitionData.committedLeaderEpoch), leaderEpoch = Optional.ofNullable[Integer](partitionData.committedLeaderEpoch),
metadata = metadata, metadata = metadata,
@ -1595,22 +1595,21 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
}) })
val toCreate = mutable.Map[String, CreatableTopic]() val toCreate = mutable.Map[String, CreatableTopic]()
createTopicsRequest.data.topics.asScala.foreach { case topic => createTopicsRequest.data.topics.asScala.foreach { topic =>
if (results.find(topic.name()).errorCode() == 0) { if (results.find(topic.name).errorCode == Errors.NONE.code) {
toCreate += topic.name() -> topic toCreate += topic.name -> topic
} }
} }
def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = { def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = {
errors.foreach { errors.foreach { case (topicName, error) =>
case (topicName, error) => results.find(topicName).
results.find(topicName). setErrorCode(error.error.code).
setErrorCode(error.error().code()). setErrorMessage(error.message)
setErrorMessage(error.message())
} }
sendResponseCallback(results) sendResponseCallback(results)
} }
adminManager.createTopics(createTopicsRequest.data.timeoutMs(), adminManager.createTopics(createTopicsRequest.data.timeoutMs,
createTopicsRequest.data.validateOnly(), createTopicsRequest.data.validateOnly,
toCreate, toCreate,
handleCreateTopicsResults) handleCreateTopicsResults)
} }
@ -1849,7 +1848,7 @@ class KafkaApis(val requestChannel: RequestChannel,
*/ */
def maybeSendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { def maybeSendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
trace(s"End transaction marker append for producer id $producerId completed with status: $responseStatus") trace(s"End transaction marker append for producer id $producerId completed with status: $responseStatus")
val currentErrors = new ConcurrentHashMap[TopicPartition, Errors](responseStatus.mapValues(_.error).asJava) val currentErrors = new ConcurrentHashMap[TopicPartition, Errors](responseStatus.map { case (k, v) => k -> v.error }.asJava)
updateErrors(producerId, currentErrors) updateErrors(producerId, currentErrors)
val successfulOffsetsPartitions = responseStatus.filter { case (topicPartition, partitionResponse) => val successfulOffsetsPartitions = responseStatus.filter { case (topicPartition, partitionResponse) =>
topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionResponse.error == Errors.NONE topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionResponse.error == Errors.NONE
@ -2503,7 +2502,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authorize(request.session, Alter, Resource.ClusterResource)) { if (!authorize(request.session, Alter, Resource.ClusterResource)) {
val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null) val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)
val partitionErrors: Map[TopicPartition, ApiError] = val partitionErrors: Map[TopicPartition, ApiError] =
electionRequest.topicPartitions.map(partition => partition -> error)(breakOut) electionRequest.topicPartitions.iterator.map(partition => partition -> error).toMap
sendResponseCallback(error)(partitionErrors) sendResponseCallback(error)(partitionErrors)
} else { } else {

View File

@ -39,7 +39,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Map import scala.collection.{Map, Seq}
object Defaults { object Defaults {
/** ********* Zookeeper Configuration ***********/ /** ********* Zookeeper Configuration ***********/

View File

@ -22,6 +22,8 @@ import java.util.Properties
import kafka.metrics.KafkaMetricsReporter import kafka.metrics.KafkaMetricsReporter
import kafka.utils.{Exit, Logging, VerifiableProperties} import kafka.utils.{Exit, Logging, VerifiableProperties}
import scala.collection.Seq
object KafkaServerStartable { object KafkaServerStartable {
def fromProps(serverProps: Properties) = { def fromProps(serverProps: Properties) = {
val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps)) val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))

View File

@ -45,11 +45,11 @@ import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.{ApiError, DeleteRecordsResponse, DescribeLogDirsResponse, EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, LeaderAndIsrResponse, OffsetsForLeaderEpochRequest, StopReplicaRequest, UpdateMetadataRequest}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection._ import scala.collection.{Map, Seq, Set, mutable}
/* /*
* Result metadata of a log append operation on the log * Result metadata of a log append operation on the log
@ -193,7 +193,7 @@ class ReplicaManager(val config: KafkaConfig,
val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower) val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower)
val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats) val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
@volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir => @volatile var highWatermarkCheckpoints: Map[String, OffsetCheckpointFile] = logManager.liveLogDirs.map(dir =>
(dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
this.logIdent = s"[ReplicaManager broker=$localBrokerId] " this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
@ -487,7 +487,7 @@ class ReplicaManager(val config: KafkaConfig,
new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status
} }
recordConversionStatsCallback(localProduceResults.mapValues(_.info.recordConversionStats)) recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation // create delayed produce operation
@ -504,7 +504,7 @@ class ReplicaManager(val config: KafkaConfig,
} else { } else {
// we can respond immediately // we can respond immediately
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(produceResponseStatus) responseCallback(produceResponseStatus)
} }
} else { } else {
@ -709,7 +709,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys) delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys)
} else { } else {
// we can respond immediately // we can respond immediately
val deleteRecordsResponseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus) val deleteRecordsResponseStatus = deleteRecordsStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(deleteRecordsResponseStatus) responseCallback(deleteRecordsResponseStatus)
} }
} }
@ -1481,7 +1481,7 @@ class ReplicaManager(val config: KafkaConfig,
newOfflinePartitions.map(_.topic).foreach { topic: String => newOfflinePartitions.map(_.topic).foreach { topic: String =>
maybeRemoveTopicMetrics(topic) maybeRemoveTopicMetrics(topic)
} }
highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir) highWatermarkCheckpoints = highWatermarkCheckpoints.filter { case (checkpointDir, _) => checkpointDir != dir }
info(s"Broker $localBrokerId stopped fetcher for partitions ${newOfflinePartitions.mkString(",")} and stopped moving logs " + info(s"Broker $localBrokerId stopped fetcher for partitions ${newOfflinePartitions.mkString(",")} and stopped moving logs " +
s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.") s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.")
@ -1566,9 +1566,9 @@ class ReplicaManager(val config: KafkaConfig,
} }
if (expectedLeaders.nonEmpty) { if (expectedLeaders.nonEmpty) {
val watchKeys: Seq[TopicPartitionOperationKey] = expectedLeaders.map{ val watchKeys = expectedLeaders.iterator.map {
case (tp, _) => TopicPartitionOperationKey(tp) case (tp, _) => TopicPartitionOperationKey(tp)
}(breakOut) }.toIndexedSeq
delayedElectLeaderPurgatory.tryCompleteElseWatch( delayedElectLeaderPurgatory.tryCompleteElseWatch(
new DelayedElectLeader( new DelayedElectLeader(

View File

@ -17,13 +17,15 @@
package kafka.server package kafka.server
import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.Seq
import kafka.server.Constants._ import kafka.server.Constants._
import kafka.server.ReplicationQuotaManagerConfig._ import kafka.server.ReplicationQuotaManagerConfig._
import kafka.utils.CoreUtils._ import kafka.utils.CoreUtils._
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics._
import java.util.concurrent.locks.ReentrantReadWriteLock
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.stats.SimpleRate import org.apache.kafka.common.metrics.stats.SimpleRate

View File

@ -45,7 +45,7 @@ class CheckpointFile[T](val file: File,
try Files.createFile(file.toPath) // create the file if it doesn't exist try Files.createFile(file.toPath) // create the file if it doesn't exist
catch { case _: FileAlreadyExistsException => } catch { case _: FileAlreadyExistsException => }
def write(entries: Seq[T]) { def write(entries: Iterable[T]) {
lock synchronized { lock synchronized {
try { try {
// write to temp file and then swap with the existing file // write to temp file and then swap with the existing file

View File

@ -56,7 +56,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
val checkpoint = new CheckpointFile[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion, val checkpoint = new CheckpointFile[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent) OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets.toSeq) def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets)
def read(): Map[TopicPartition, Long] = checkpoint.read().toMap def read(): Map[TopicPartition, Long] = checkpoint.read().toMap

View File

@ -24,7 +24,8 @@ import kafka.utils.CoreUtils._
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import scala.collection.mutable.ListBuffer import scala.collection.Seq
import scala.collection.mutable.ArrayBuffer
/** /**
* Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
@ -42,7 +43,10 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
this.logIdent = s"[LeaderEpochCache $topicPartition] " this.logIdent = s"[LeaderEpochCache $topicPartition] "
private val lock = new ReentrantReadWriteLock() private val lock = new ReentrantReadWriteLock()
private var epochs: ListBuffer[EpochEntry] = inWriteLock(lock) { ListBuffer(checkpoint.read(): _*) } private var epochs: ArrayBuffer[EpochEntry] = inWriteLock(lock) {
val read = checkpoint.read()
new ArrayBuffer(read.size) ++= read
}
/** /**
* Assigns the supplied Leader Epoch to the supplied Offset * Assigns the supplied Leader Epoch to the supplied Offset
@ -223,7 +227,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
} }
// Visible for testing // Visible for testing
def epochEntries: ListBuffer[EpochEntry] = { def epochEntries: Seq[EpochEntry] = {
epochs epochs
} }

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest
import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.ByteArrayDeserializer
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Seq
object GetOffsetShell { object GetOffsetShell {
@ -127,7 +128,9 @@ object GetOffsetShell {
case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala
case _ => case _ =>
val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava
consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(x => if (x == null) null else x.offset) consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, x) =>
if (x == null) (k, null) else (k, x.offset: java.lang.Long)
}
} }
partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) =>

View File

@ -43,6 +43,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Seq
/** /**
* For verifying the consistency among replicas. * For verifying the consistency among replicas.

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import scala.collection.JavaConverters;
import java.io.IOException; import java.io.IOException;
import java.text.ParseException; import java.text.ParseException;
@ -261,22 +262,29 @@ public class StreamsResetter {
CommandLineUtils.printUsageAndDie(optionParser, "Only one of --dry-run and --execute can be specified"); CommandLineUtils.printUsageAndDie(optionParser, "Only one of --dry-run and --execute can be specified");
} }
final scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<>(); final Set<OptionSpec<?>> allScenarioOptions = new HashSet<>();
allScenarioOptions.$plus(toOffsetOption); allScenarioOptions.add(toOffsetOption);
allScenarioOptions.$plus(toDatetimeOption); allScenarioOptions.add(toDatetimeOption);
allScenarioOptions.$plus(byDurationOption); allScenarioOptions.add(byDurationOption);
allScenarioOptions.$plus(toEarliestOption); allScenarioOptions.add(toEarliestOption);
allScenarioOptions.$plus(toLatestOption); allScenarioOptions.add(toLatestOption);
allScenarioOptions.$plus(fromFileOption); allScenarioOptions.add(fromFileOption);
allScenarioOptions.$plus(shiftByOption); allScenarioOptions.add(shiftByOption);
CommandLineUtils.checkInvalidArgs(optionParser, options, toOffsetOption, allScenarioOptions.$minus(toOffsetOption)); checkInvalidArgs(optionParser, options, allScenarioOptions, toOffsetOption);
CommandLineUtils.checkInvalidArgs(optionParser, options, toDatetimeOption, allScenarioOptions.$minus(toDatetimeOption)); checkInvalidArgs(optionParser, options, allScenarioOptions, toDatetimeOption);
CommandLineUtils.checkInvalidArgs(optionParser, options, byDurationOption, allScenarioOptions.$minus(byDurationOption)); checkInvalidArgs(optionParser, options, allScenarioOptions, byDurationOption);
CommandLineUtils.checkInvalidArgs(optionParser, options, toEarliestOption, allScenarioOptions.$minus(toEarliestOption)); checkInvalidArgs(optionParser, options, allScenarioOptions, toEarliestOption);
CommandLineUtils.checkInvalidArgs(optionParser, options, toLatestOption, allScenarioOptions.$minus(toLatestOption)); checkInvalidArgs(optionParser, options, allScenarioOptions, toLatestOption);
CommandLineUtils.checkInvalidArgs(optionParser, options, fromFileOption, allScenarioOptions.$minus(fromFileOption)); checkInvalidArgs(optionParser, options, allScenarioOptions, fromFileOption);
CommandLineUtils.checkInvalidArgs(optionParser, options, shiftByOption, allScenarioOptions.$minus(shiftByOption)); checkInvalidArgs(optionParser, options, allScenarioOptions, shiftByOption);
}
private <T> void checkInvalidArgs(OptionParser optionParser, OptionSet options, Set<OptionSpec<?>> allOptions,
OptionSpec<T> option) {
Set<OptionSpec<?>> invalidOptions = new HashSet<>(allOptions);
invalidOptions.remove(option);
CommandLineUtils.checkInvalidArgs(optionParser, options, option, JavaConverters.asScalaSetConverter(invalidOptions).asScala());
} }
private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig, private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig,

View File

@ -28,7 +28,7 @@ import com.typesafe.scalalogging.Logger
import javax.management._ import javax.management._
import scala.collection._ import scala.collection._
import scala.collection.mutable import scala.collection.{Seq, mutable}
import kafka.cluster.EndPoint import kafka.cluster.EndPoint
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
@ -50,10 +50,10 @@ object CoreUtils {
private val logger = Logger(getClass) private val logger = Logger(getClass)
/** /**
* Return the smallest element in `traversable` if it is not empty. Otherwise return `ifEmpty`. * Return the smallest element in `iterable` if it is not empty. Otherwise return `ifEmpty`.
*/ */
def min[A, B >: A](traversable: TraversableOnce[A], ifEmpty: A)(implicit cmp: Ordering[B]): A = def min[A, B >: A](iterable: Iterable[A], ifEmpty: A)(implicit cmp: Ordering[B]): A =
if (traversable.isEmpty) ifEmpty else traversable.min(cmp) if (iterable.isEmpty) ifEmpty else iterable.min(cmp)
/** /**
* Wrap the given function in a java.lang.Runnable * Wrap the given function in a java.lang.Runnable
@ -260,31 +260,6 @@ object CoreUtils {
def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun) def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun)
//JSON strings need to be escaped based on ECMA-404 standard http://json.org
def JSONEscapeString (s : String) : String = {
s.map {
case '"' => "\\\""
case '\\' => "\\\\"
case '/' => "\\/"
case '\b' => "\\b"
case '\f' => "\\f"
case '\n' => "\\n"
case '\r' => "\\r"
case '\t' => "\\t"
/* We'll unicode escape any control characters. These include:
* 0x0 -> 0x1f : ASCII Control (C0 Control Codes)
* 0x7f : ASCII DELETE
* 0x80 -> 0x9f : C1 Control Codes
*
* Per RFC4627, section 2.5, we're not technically required to
* encode the C1 codes, but we do to be safe.
*/
case c if (c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f') => "\\u%04x".format(c: Int)
case c => c
}.mkString
}
/** /**
* Returns a list of duplicated items * Returns a list of duplicated items
*/ */

View File

@ -17,10 +17,10 @@
package kafka.utils.json package kafka.utils.json
import scala.collection._ import scala.collection.{Map, Seq}
import scala.collection.compat._
import scala.language.higherKinds import scala.language.higherKinds
import JavaConverters._ import scala.collection.JavaConverters._
import generic.CanBuildFrom
import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode} import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
@ -88,7 +88,7 @@ object DecodeJson {
} }
} }
implicit def decodeSeq[E, S[+T] <: Seq[E]](implicit decodeJson: DecodeJson[E], cbf: CanBuildFrom[Nothing, E, S[E]]): DecodeJson[S[E]] = new DecodeJson[S[E]] { implicit def decodeSeq[E, S[+T] <: Seq[E]](implicit decodeJson: DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = new DecodeJson[S[E]] {
def decodeEither(node: JsonNode): Either[String, S[E]] = { def decodeEither(node: JsonNode): Either[String, S[E]] = {
if (node.isArray) if (node.isArray)
decodeIterator(node.elements.asScala)(decodeJson.decodeEither) decodeIterator(node.elements.asScala)(decodeJson.decodeEither)
@ -96,16 +96,16 @@ object DecodeJson {
} }
} }
implicit def decodeMap[V, M[K, +V] <: Map[K, V]](implicit decodeJson: DecodeJson[V], cbf: CanBuildFrom[Nothing, (String, V), M[String, V]]): DecodeJson[M[String, V]] = new DecodeJson[M[String, V]] { implicit def decodeMap[V, M[K, +V] <: Map[K, V]](implicit decodeJson: DecodeJson[V], factory: Factory[(String, V), M[String, V]]): DecodeJson[M[String, V]] = new DecodeJson[M[String, V]] {
def decodeEither(node: JsonNode): Either[String, M[String, V]] = { def decodeEither(node: JsonNode): Either[String, M[String, V]] = {
if (node.isObject) if (node.isObject)
decodeIterator(node.fields.asScala)(e => decodeJson.decodeEither(e.getValue).right.map(v => (e.getKey, v)))(cbf) decodeIterator(node.fields.asScala)(e => decodeJson.decodeEither(e.getValue).right.map(v => (e.getKey, v)))
else Left(s"Expected JSON object, received $node") else Left(s"Expected JSON object, received $node")
} }
} }
private def decodeIterator[S, T, C](it: Iterator[S])(f: S => Either[String, T])(implicit cbf: CanBuildFrom[Nothing, T, C]): Either[String, C] = { private def decodeIterator[S, T, C](it: Iterator[S])(f: S => Either[String, T])(implicit factory: Factory[T, C]): Either[String, C] = {
val result = cbf() val result = factory.newBuilder
while (it.hasNext) { while (it.hasNext) {
f(it.next) match { f(it.next) match {
case Right(x) => result += x case Right(x) => result += x

View File

@ -38,9 +38,7 @@ import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult} import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
import org.apache.zookeeper.data.{ACL, Stat} import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper} import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
import scala.collection.Seq import scala.collection.{Map, Seq, mutable}
import scala.collection.breakOut
import scala.collection.mutable
/** /**
* Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]]. * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
@ -261,11 +259,11 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
} catch { } catch {
case e: ControllerMovedException => throw e case e: ControllerMovedException => throw e
case e: Exception => case e: Exception =>
return UpdateLeaderAndIsrResult(leaderAndIsrs.keys.map(_ -> Left(e))(breakOut), Seq.empty) return UpdateLeaderAndIsrResult(leaderAndIsrs.keys.iterator.map(_ -> Left(e)).toMap, Seq.empty)
} }
val updatesToRetry = mutable.Buffer.empty[TopicPartition] val updatesToRetry = mutable.Buffer.empty[TopicPartition]
val finished: Map[TopicPartition, Either[Exception, LeaderAndIsr]] = setDataResponses.flatMap { setDataResponse => val finished = setDataResponses.iterator.flatMap { setDataResponse =>
val partition = setDataResponse.ctx.get.asInstanceOf[TopicPartition] val partition = setDataResponse.ctx.get.asInstanceOf[TopicPartition]
setDataResponse.resultCode match { setDataResponse.resultCode match {
case Code.OK => case Code.OK =>
@ -278,7 +276,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
case _ => case _ =>
Some(partition -> Left(setDataResponse.resultException.get)) Some(partition -> Left(setDataResponse.resultException.get))
} }
}(breakOut) }.toMap
UpdateLeaderAndIsrResult(finished, updatesToRetry) UpdateLeaderAndIsrResult(finished, updatesToRetry)
} }
@ -1626,7 +1624,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
retryRequestsUntilConnected(createRequests, expectedControllerEpochZkVersion) retryRequestsUntilConnected(createRequests, expectedControllerEpochZkVersion)
} }
private def createTopicPartitions(topics: Seq[String], expectedControllerEpochZkVersion: Int):Seq[CreateResponse] = { private def createTopicPartitions(topics: Seq[String], expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = {
val createRequests = topics.map { topic => val createRequests = topics.map { topic =>
val path = TopicPartitionsZNode.path(topic) val path = TopicPartitionsZNode.path(topic)
CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT, Some(topic)) CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT, Some(topic))
@ -1635,9 +1633,9 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
} }
private def getTopicConfigs(topics: Set[String]): Seq[GetDataResponse] = { private def getTopicConfigs(topics: Set[String]): Seq[GetDataResponse] = {
val getDataRequests: Seq[GetDataRequest] = topics.map { topic => val getDataRequests: Seq[GetDataRequest] = topics.iterator.map { topic =>
GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic)) GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic))
}(breakOut) }.toIndexedSeq
retryRequestsUntilConnected(getDataRequests) retryRequestsUntilConnected(getDataRequests)
} }
@ -1662,7 +1660,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
} }
private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
val remainingRequests = mutable.ArrayBuffer(requests: _*) val remainingRequests = new mutable.ArrayBuffer(requests.size) ++= requests
val responses = new mutable.ArrayBuffer[Req#Response] val responses = new mutable.ArrayBuffer[Req#Response]
while (remainingRequests.nonEmpty) { while (remainingRequests.nonEmpty) {
val batchResponses = zooKeeperClient.handleRequests(remainingRequests) val batchResponses = zooKeeperClient.handleRequests(remainingRequests)

View File

@ -43,7 +43,7 @@ import org.apache.zookeeper.data.{ACL, Stat}
import scala.beans.BeanProperty import scala.beans.BeanProperty
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, breakOut} import scala.collection.{Map, Seq}
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes). // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
@ -405,9 +405,9 @@ object ReassignPartitionsZNode {
def decode(bytes: Array[Byte]): Either[JsonProcessingException, collection.Map[TopicPartition, Seq[Int]]] = def decode(bytes: Array[Byte]): Either[JsonProcessingException, collection.Map[TopicPartition, Seq[Int]]] =
Json.parseBytesAs[PartitionAssignment](bytes).right.map { partitionAssignment => Json.parseBytesAs[PartitionAssignment](bytes).right.map { partitionAssignment =>
partitionAssignment.partitions.asScala.map { replicaAssignment => partitionAssignment.partitions.asScala.iterator.map { replicaAssignment =>
new TopicPartition(replicaAssignment.topic, replicaAssignment.partition) -> replicaAssignment.replicas.asScala new TopicPartition(replicaAssignment.topic, replicaAssignment.partition) -> replicaAssignment.replicas.asScala
}(breakOut) }.toMap
} }
} }

View File

@ -35,6 +35,7 @@ import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper._ import org.apache.zookeeper._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.collection.mutable.Set import scala.collection.mutable.Set
/** /**

View File

@ -20,6 +20,8 @@ package kafka.admin
import java.io.{ByteArrayOutputStream, PrintStream} import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
import scala.collection.Seq
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.TestUtils import kafka.utils.TestUtils
@ -33,7 +35,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps) def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
@Test(timeout=120000) @Test(timeout=120000)
def checkBrokerApiVersionCommandOutput() { def checkBrokerApiVersionCommandOutput(): Unit = {
val byteArrayOutputStream = new ByteArrayOutputStream val byteArrayOutputStream = new ByteArrayOutputStream
val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name()) val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
BrokerApiVersionsCommand.execute(Array("--bootstrap-server", brokerList), printStream) BrokerApiVersionsCommand.execute(Array("--bootstrap-server", brokerList), printStream)

View File

@ -20,7 +20,7 @@ import org.junit.Test
class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAwareTest { class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAwareTest {
@Test @Test
def testRackAwareReassign() { def testRackAwareReassign(): Unit = {
val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3") val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkClient) TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkClient)

View File

@ -50,6 +50,7 @@ import org.junit.rules.Timeout
import org.junit.{After, Before, Rule, Test} import org.junit.{After, Before, Rule, Test}
import org.scalatest.Assertions.intercept import org.scalatest.Assertions.intercept
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future} import scala.concurrent.{Await, Future}

View File

@ -23,6 +23,7 @@ import org.junit.Test
import org.junit.Assert._ import org.junit.Assert._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Seq
/** /**
* Integration tests for the consumer that cover basic usage as well as coordinator failure * Integration tests for the consumer that cover basic usage as well as coordinator failure

View File

@ -31,6 +31,7 @@ import org.junit.{After, Ignore, Test}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
import scala.collection.Seq
/** /**
* Integration tests for the consumer that cover basic usage as well as server failures * Integration tests for the consumer that cover basic usage as well as server failures

View File

@ -18,7 +18,7 @@ import java.io.File
import java.{lang, util} import java.{lang, util}
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Collections, Properties} import java.util.Properties
import kafka.api.GroupedUserPrincipalBuilder._ import kafka.api.GroupedUserPrincipalBuilder._
import kafka.api.GroupedUserQuotaCallback._ import kafka.api.GroupedUserQuotaCallback._
@ -315,7 +315,7 @@ object GroupedUserQuotaCallback {
val QuotaGroupTag = "group" val QuotaGroupTag = "group"
val DefaultProduceQuotaProp = "default.produce.quota" val DefaultProduceQuotaProp = "default.produce.quota"
val DefaultFetchQuotaProp = "default.fetch.quota" val DefaultFetchQuotaProp = "default.fetch.quota"
val UnlimitedQuotaMetricTags = Collections.emptyMap[String, String] val UnlimitedQuotaMetricTags = new util.HashMap[String, String]
val quotaLimitCalls = Map( val quotaLimitCalls = Map(
ClientQuotaType.PRODUCE -> new AtomicInteger, ClientQuotaType.PRODUCE -> new AtomicInteger,
ClientQuotaType.FETCH -> new AtomicInteger, ClientQuotaType.FETCH -> new AtomicInteger,
@ -344,10 +344,8 @@ object GroupedUserQuotaCallback {
class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable with Logging { class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable with Logging {
var brokerId: Int = -1 var brokerId: Int = -1
val customQuotasUpdated = ClientQuotaType.values.toList val customQuotasUpdated = ClientQuotaType.values.map(quotaType => quotaType -> new AtomicBoolean).toMap
.map(quotaType =>(quotaType -> new AtomicBoolean)).toMap val quotas = ClientQuotaType.values.map(quotaType => quotaType -> new ConcurrentHashMap[String, Double]).toMap
val quotas = ClientQuotaType.values.toList
.map(quotaType => (quotaType -> new ConcurrentHashMap[String, Double])).toMap
val partitionRatio = new ConcurrentHashMap[String, Double]() val partitionRatio = new ConcurrentHashMap[String, Double]()
@ -398,7 +396,7 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w
override def updateClusterMetadata(cluster: Cluster): Boolean = { override def updateClusterMetadata(cluster: Cluster): Boolean = {
val topicsByGroup = cluster.topics.asScala.groupBy(group) val topicsByGroup = cluster.topics.asScala.groupBy(group)
!topicsByGroup.forall { case (group, groupTopics) => topicsByGroup.map { case (group, groupTopics) =>
val groupPartitions = groupTopics.flatMap(topic => cluster.partitionsForTopic(topic).asScala) val groupPartitions = groupTopics.flatMap(topic => cluster.partitionsForTopic(topic).asScala)
val totalPartitions = groupPartitions.size val totalPartitions = groupPartitions.size
val partitionsOnThisBroker = groupPartitions.count { p => p.leader != null && p.leader.id == brokerId } val partitionsOnThisBroker = groupPartitions.count { p => p.leader != null && p.leader.id == brokerId }
@ -409,7 +407,7 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w
else else
partitionsOnThisBroker.toDouble / totalPartitions partitionsOnThisBroker.toDouble / totalPartitions
partitionRatio.put(group, multiplier) != multiplier partitionRatio.put(group, multiplier) != multiplier
} }.exists(identity)
} }
override def updateQuota(quotaType: ClientQuotaType, quotaEntity: ClientQuotaEntity, newValue: Double): Unit = { override def updateQuota(quotaType: ClientQuotaType, quotaEntity: ClientQuotaEntity, newValue: Double): Unit = {

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.junit.{After, Before} import org.junit.{After, Before}
import scala.collection.mutable import scala.collection.mutable
import scala.collection.Seq
/** /**
* A helper class for writing integration tests that involve producers, consumers, and servers * A helper class for writing integration tests that involve producers, consumers, and servers

View File

@ -15,6 +15,8 @@ package kafka.api
import java.io.File import java.io.File
import java.util.{Locale, Properties} import java.util.{Locale, Properties}
import scala.collection.Seq
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils} import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName

View File

@ -21,6 +21,8 @@ import javax.security.auth.callback._
import javax.security.auth.Subject import javax.security.auth.Subject
import javax.security.auth.login.AppConfigurationEntry import javax.security.auth.login.AppConfigurationEntry
import scala.collection.Seq
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.{TestUtils} import kafka.utils.{TestUtils}
import kafka.utils.JaasTestUtils._ import kafka.utils.JaasTestUtils._

View File

@ -21,6 +21,8 @@ import java.io.File
import java.util.Properties import java.util.Properties
import javax.security.auth.login.Configuration import javax.security.auth.login.Configuration
import scala.collection.Seq
import kafka.admin.ConfigCommand import kafka.admin.ConfigCommand
import kafka.security.minikdc.MiniKdc import kafka.security.minikdc.MiniKdc
import kafka.server.KafkaConfig import kafka.server.KafkaConfig

View File

@ -36,6 +36,7 @@ import org.scalatest.Assertions.fail
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.Buffer import scala.collection.mutable.Buffer
import scala.collection.Seq
import scala.concurrent.ExecutionException import scala.concurrent.ExecutionException
class TransactionsTest extends KafkaServerTestHarness { class TransactionsTest extends KafkaServerTestHarness {

View File

@ -18,6 +18,8 @@ package kafka.server
import java.util.Optional import java.util.Optional
import scala.collection.Seq
import kafka.cluster.Partition import kafka.cluster.Partition
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.FencedLeaderEpochException import org.apache.kafka.common.errors.FencedLeaderEpochException

View File

@ -1233,7 +1233,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit = { private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit = {
val alterResult = TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig) val alterResult = TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig)
if (expectFailure) { if (expectFailure) {
val oldProps = servers.head.config.values.asScala.filterKeys(newProps.containsKey) val oldProps = servers.head.config.values.asScala.filter { case (k, _) => newProps.containsKey(k) }
val brokerResources = if (perBrokerConfig) val brokerResources = if (perBrokerConfig)
servers.map(server => new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)) servers.map(server => new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString))
else else
@ -1242,7 +1242,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
val exception = intercept[ExecutionException](alterResult.values.get(brokerResource).get) val exception = intercept[ExecutionException](alterResult.values.get(brokerResource).get)
assertTrue(exception.getCause.isInstanceOf[InvalidRequestException]) assertTrue(exception.getCause.isInstanceOf[InvalidRequestException])
} }
servers.foreach { server => assertEquals(oldProps, server.config.values.asScala.filterKeys(newProps.containsKey)) } servers.foreach { server =>
assertEquals(oldProps, server.config.values.asScala.filter { case (k, _) => newProps.containsKey(k) })
}
} else { } else {
alterResult.all.get alterResult.all.get
waitForConfig(aPropToVerify._1, aPropToVerify._2) waitForConfig(aPropToVerify._1, aPropToVerify._2)

View File

@ -19,6 +19,8 @@ package kafka.server
import java.util.Properties import java.util.Properties
import scala.collection.Seq
import kafka.utils.JaasTestUtils import kafka.utils.JaasTestUtils
import kafka.utils.JaasTestUtils.JaasSection import kafka.utils.JaasTestUtils.JaasSection

View File

@ -19,6 +19,8 @@ package kafka.server
import java.util.Properties import java.util.Properties
import scala.collection.Seq
import kafka.api.Both import kafka.api.Both
import kafka.utils.JaasTestUtils.JaasSection import kafka.utils.JaasTestUtils.JaasSection

View File

@ -38,6 +38,7 @@ import org.junit.{After, Before, Test}
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.Seq
object MultipleListenersWithSameSecurityProtocolBaseTest { object MultipleListenersWithSameSecurityProtocolBaseTest {
val SecureInternal = "SECURE_INTERNAL" val SecureInternal = "SECURE_INTERNAL"

View File

@ -18,6 +18,8 @@ package kafka.tools
import java.util.Properties import java.util.Properties
import scala.collection.Seq
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.tools.MirrorMaker.{ConsumerWrapper, MirrorMakerProducer, NoRecordsException} import kafka.tools.MirrorMaker.{ConsumerWrapper, MirrorMakerProducer, NoRecordsException}

View File

@ -148,7 +148,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
val replicationFactor = 5 val replicationFactor = 5
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2") val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2")
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor) val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size)) assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.toIndexedSeq.map(_.size))
val distribution = getReplicaDistribution(assignment, brokerRackMapping) val distribution = getReplicaDistribution(assignment, brokerRackMapping)
for (partition <- 0 until numPartitions) for (partition <- 0 until numPartitions)
assertEquals(3, distribution.partitionRacks(partition).toSet.size) assertEquals(3, distribution.partitionRacks(partition).toSet.size)
@ -161,7 +161,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2") val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2")
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
replicationFactor) replicationFactor)
assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size)) assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.toIndexedSeq.map(_.size))
val distribution = getReplicaDistribution(assignment, brokerRackMapping) val distribution = getReplicaDistribution(assignment, brokerRackMapping)
for (partition <- 0 to 5) for (partition <- 0 to 5)
assertEquals(2, distribution.partitionRacks(partition).toSet.size) assertEquals(2, distribution.partitionRacks(partition).toSet.size)
@ -173,7 +173,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
val replicationFactor = 3 val replicationFactor = 3
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack1", 3 -> "rack1", 4 -> "rack1", 5 -> "rack1") val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack1", 3 -> "rack1", 4 -> "rack1", 5 -> "rack1")
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor) val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size)) assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.toIndexedSeq.map(_.size))
val distribution = getReplicaDistribution(assignment, brokerRackMapping) val distribution = getReplicaDistribution(assignment, brokerRackMapping)
for (partition <- 0 until numPartitions) for (partition <- 0 until numPartitions)
assertEquals(1, distribution.partitionRacks(partition).toSet.size) assertEquals(1, distribution.partitionRacks(partition).toSet.size)

View File

@ -16,13 +16,16 @@
*/ */
package kafka.admin package kafka.admin
import java.util.Properties
import scala.collection.Seq
import kafka.log.Log import kafka.log.Log
import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness} import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness}
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.{KafkaConfig, KafkaServer}
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{After, Test} import org.junit.{After, Test}
import java.util.Properties
import kafka.admin.TopicCommand.ZookeeperTopicService import kafka.admin.TopicCommand.ZookeeperTopicService
import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.common.TopicAlreadyMarkedForDeletionException

View File

@ -36,6 +36,7 @@ import org.junit.Assert._
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.concurrent.duration._ import scala.concurrent.duration._
final class LeaderElectionCommandTest extends ZooKeeperTestHarness { final class LeaderElectionCommandTest extends ZooKeeperTestHarness {

View File

@ -21,6 +21,8 @@ import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths} import java.nio.file.{Files, Paths}
import java.util.Properties import java.util.Properties
import scala.collection.Seq
import kafka.common.AdminCommandFailedException import kafka.common.AdminCommandFailedException
import kafka.network.RequestChannel import kafka.network.RequestChannel
import kafka.security.auth._ import kafka.security.auth._
@ -56,7 +58,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
brokerConfigs.foreach(p => p.setProperty("authorizer.class.name", className)) brokerConfigs.foreach(p => p.setProperty("authorizer.class.name", className))
case None => case None =>
} }
createTestTopicAndCluster(topicPartition,brokerConfigs) createTestTopicAndCluster(topicPartition, brokerConfigs)
} }
private def createTestTopicAndCluster(partitionsAndAssignments: Map[TopicPartition, List[Int]], private def createTestTopicAndCluster(partitionsAndAssignments: Map[TopicPartition, List[Int]],

View File

@ -37,6 +37,7 @@ import scala.collection.JavaConverters._
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import scala.collection.mutable import scala.collection.mutable
import scala.collection.Seq
class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging { class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
var servers: Seq[KafkaServer] = Seq() var servers: Seq[KafkaServer] = Seq()

View File

@ -16,6 +16,8 @@ import java.io.{BufferedWriter, File, FileWriter}
import java.text.{ParseException, SimpleDateFormat} import java.text.{ParseException, SimpleDateFormat}
import java.util.{Calendar, Date, Properties} import java.util.{Calendar, Date, Properties}
import scala.collection.Seq
import joptsimple.OptionException import joptsimple.OptionException
import kafka.admin.ConsumerGroupCommand.ConsumerGroupService import kafka.admin.ConsumerGroupCommand.ConsumerGroupService
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
@ -325,7 +327,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
val tp1 = new TopicPartition(topic1, 0) val tp1 = new TopicPartition(topic1, 0)
val tp2 = new TopicPartition(topic2, 0) val tp2 = new TopicPartition(topic2, 0)
val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset()) val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset).toMap
assertEquals(Map(tp1 -> 0L, tp2 -> 0L), allResetOffsets) assertEquals(Map(tp1 -> 0L, tp2 -> 0L), allResetOffsets)
assertEquals(Map(tp1 -> 0L), committedOffsets(topic1)) assertEquals(Map(tp1 -> 0L), committedOffsets(topic1))
assertEquals(Map(tp2 -> 0L), committedOffsets(topic2)) assertEquals(Map(tp2 -> 0L), committedOffsets(topic2))
@ -353,7 +355,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
val tp1 = new TopicPartition(topic1, 1) val tp1 = new TopicPartition(topic1, 1)
val tp2 = new TopicPartition(topic2, 1) val tp2 = new TopicPartition(topic2, 1)
val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset()) val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset).toMap
assertEquals(Map(tp1 -> 0, tp2 -> 0), allResetOffsets) assertEquals(Map(tp1 -> 0, tp2 -> 0), allResetOffsets)
assertEquals(priorCommittedOffsets1 + (tp1 -> 0L), committedOffsets(topic1)) assertEquals(priorCommittedOffsets1 + (tp1 -> 0L), committedOffsets(topic1))
@ -383,12 +385,12 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
val bw = new BufferedWriter(new FileWriter(file)) val bw = new BufferedWriter(new FileWriter(file))
bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets)) bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets))
bw.close() bw.close()
assertEquals(Map(tp0 -> 2L, tp1 -> 2L), exportedOffsets(group).mapValues(_.offset)) assertEquals(Map(tp0 -> 2L, tp1 -> 2L), exportedOffsets(group).mapValues(_.offset).toMap)
val cgcArgsExec = buildArgsForGroup(group, "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run") val cgcArgsExec = buildArgsForGroup(group, "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run")
val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec) val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
val importedOffsets = consumerGroupCommandExec.resetOffsets() val importedOffsets = consumerGroupCommandExec.resetOffsets()
assertEquals(Map(tp0 -> 2L, tp1 -> 2L), importedOffsets(group).mapValues(_.offset)) assertEquals(Map(tp0 -> 2L, tp1 -> 2L), importedOffsets(group).mapValues(_.offset).toMap)
adminZkClient.deleteTopic(topic) adminZkClient.deleteTopic(topic)
} }
@ -421,21 +423,21 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
val bw = new BufferedWriter(new FileWriter(file)) val bw = new BufferedWriter(new FileWriter(file))
bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets)) bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets))
bw.close() bw.close()
assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), exportedOffsets(group1).mapValues(_.offset)) assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), exportedOffsets(group1).mapValues(_.offset).toMap)
assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), exportedOffsets(group2).mapValues(_.offset)) assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), exportedOffsets(group2).mapValues(_.offset).toMap)
// Multiple --group's offset import // Multiple --group's offset import
val cgcArgsExec = buildArgsForGroups(Seq(group1, group2), "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run") val cgcArgsExec = buildArgsForGroups(Seq(group1, group2), "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run")
val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec) val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
val importedOffsets = consumerGroupCommandExec.resetOffsets() val importedOffsets = consumerGroupCommandExec.resetOffsets()
assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets(group1).mapValues(_.offset)) assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets(group1).mapValues(_.offset).toMap)
assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), importedOffsets(group2).mapValues(_.offset)) assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), importedOffsets(group2).mapValues(_.offset).toMap)
// Single --group offset import using "group,topic,partition,offset" csv format // Single --group offset import using "group,topic,partition,offset" csv format
val cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run") val cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run")
val consumerGroupCommandExec2 = getConsumerGroupService(cgcArgsExec2) val consumerGroupCommandExec2 = getConsumerGroupService(cgcArgsExec2)
val importedOffsets2 = consumerGroupCommandExec2.resetOffsets() val importedOffsets2 = consumerGroupCommandExec2.resetOffsets()
assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets2(group1).mapValues(_.offset)) assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets2(group1).mapValues(_.offset).toMap)
adminZkClient.deleteTopic(topic) adminZkClient.deleteTopic(topic)
} }
@ -482,7 +484,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
(group, partitionInfo) <- resetOffsetsResultByGroup (group, partitionInfo) <- resetOffsetsResultByGroup
} { } {
val priorOffsets = committedOffsets(topic = topic, group = group) val priorOffsets = committedOffsets(topic = topic, group = group)
assertEquals(expectedOffsets(topic), partitionInfo.filter(partitionInfo => partitionInfo._1.topic() == topic).mapValues(_.offset)) assertEquals(expectedOffsets(topic), partitionInfo.filter(partitionInfo => partitionInfo._1.topic() == topic).mapValues(_.offset).toMap)
assertEquals(if (dryRun) priorOffsets else expectedOffsets(topic), committedOffsets(topic = topic, group = group)) assertEquals(if (dryRun) priorOffsets else expectedOffsets(topic), committedOffsets(topic = topic, group = group))
} }
} finally { } finally {

View File

@ -37,6 +37,7 @@ import org.junit.{After, Before, Rule, Test}
import org.scalatest.Assertions.{fail, intercept} import org.scalatest.Assertions.{fail, intercept}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.concurrent.ExecutionException import scala.concurrent.ExecutionException
import scala.util.Random import scala.util.Random

View File

@ -754,7 +754,7 @@ class PartitionTest {
} }
assertThrows[IllegalArgumentException] { assertThrows[IllegalArgumentException] {
val replica = partition.getOrCreateReplica(brokerId) partition.getOrCreateReplica(brokerId)
} }
val remoteReplicaId = brokerId + 1; val remoteReplicaId = brokerId + 1;

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.resource.PatternType.LITERAL
import org.junit.{After, Before, Test} import org.junit.{After, Before, Test}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.Seq
class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness { class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {

View File

@ -71,10 +71,10 @@ class ControllerChannelManagerTest {
assertEquals(controllerId, leaderAndIsrRequest.controllerId) assertEquals(controllerId, leaderAndIsrRequest.controllerId)
assertEquals(controllerEpoch, leaderAndIsrRequest.controllerEpoch) assertEquals(controllerEpoch, leaderAndIsrRequest.controllerEpoch)
assertEquals(partitions.keySet, leaderAndIsrRequest.partitionStates.keySet.asScala) assertEquals(partitions.keySet, leaderAndIsrRequest.partitionStates.keySet.asScala)
assertEquals(partitions.mapValues(_.leader), assertEquals(partitions.map { case (k, v) => (k, v.leader) },
leaderAndIsrRequest.partitionStates.asScala.mapValues(_.basePartitionState.leader)) leaderAndIsrRequest.partitionStates.asScala.mapValues(_.basePartitionState.leader).toMap)
assertEquals(partitions.mapValues(_.isr), assertEquals(partitions.map { case (k, v) => (k, v.isr) },
leaderAndIsrRequest.partitionStates.asScala.mapValues(_.basePartitionState.isr.asScala)) leaderAndIsrRequest.partitionStates.asScala.mapValues(_.basePartitionState.isr.asScala).toMap)
applyLeaderAndIsrResponseCallbacks(Errors.NONE, batch.sentRequests(2).toList) applyLeaderAndIsrResponseCallbacks(Errors.NONE, batch.sentRequests(2).toList)
assertEquals(1, batch.sentEvents.size) assertEquals(1, batch.sentEvents.size)
@ -202,8 +202,10 @@ class ControllerChannelManagerTest {
val updateMetadataRequest = updateMetadataRequests.head val updateMetadataRequest = updateMetadataRequests.head
assertEquals(3, updateMetadataRequest.partitionStates.size) assertEquals(3, updateMetadataRequest.partitionStates.size)
assertEquals(partitions.mapValues(_.leader), updateMetadataRequest.partitionStates.asScala.mapValues(_.basePartitionState.leader)) assertEquals(partitions.map { case (k, v) => (k, v.leader) },
assertEquals(partitions.mapValues(_.isr), updateMetadataRequest.partitionStates.asScala.mapValues(_.basePartitionState.isr.asScala)) updateMetadataRequest.partitionStates.asScala.map { case (k, v) => (k, v.basePartitionState.leader) })
assertEquals(partitions.map { case (k, v) => (k, v.isr) },
updateMetadataRequest.partitionStates.asScala.map { case (k, v) => (k, v.basePartitionState.isr.asScala) })
assertEquals(controllerId, updateMetadataRequest.controllerId) assertEquals(controllerId, updateMetadataRequest.controllerId)
assertEquals(controllerEpoch, updateMetadataRequest.controllerEpoch) assertEquals(controllerEpoch, updateMetadataRequest.controllerEpoch)
@ -272,9 +274,11 @@ class ControllerChannelManagerTest {
.map(_.basePartitionState.leader) .map(_.basePartitionState.leader)
.forall(leaderId => leaderId == LeaderAndIsr.LeaderDuringDelete)) .forall(leaderId => leaderId == LeaderAndIsr.LeaderDuringDelete))
assertEquals(partitions.filterKeys(_.topic == "bar").mapValues(_.leader), assertEquals(partitions.filter { case (k, _) => k.topic == "bar" }.map { case (k, v) => (k, v.leader) },
updateMetadataRequest.partitionStates.asScala.filterKeys(_.topic == "bar").mapValues(_.basePartitionState.leader)) updateMetadataRequest.partitionStates.asScala.filter { case (k, _) => k.topic == "bar" }.map { case (k, v) =>
assertEquals(partitions.mapValues(_.isr), updateMetadataRequest.partitionStates.asScala.mapValues(_.basePartitionState.isr.asScala)) (k, v.basePartitionState.leader) })
assertEquals(partitions.map { case (k, v) => (k, v.isr) },
updateMetadataRequest.partitionStates.asScala.map { case (k, v) => (k, v.basePartitionState.isr.asScala) })
assertEquals(3, updateMetadataRequest.liveBrokers.size) assertEquals(3, updateMetadataRequest.liveBrokers.size)
assertEquals(Set(1, 2, 3), updateMetadataRequest.liveBrokers.asScala.map(_.id).toSet) assertEquals(Set(1, 2, 3), updateMetadataRequest.liveBrokers.asScala.map(_.id).toSet)

View File

@ -103,7 +103,7 @@ class ControllerEventManagerTest {
@Test @Test
def testSuccessfulEvent(): Unit = { def testSuccessfulEvent(): Unit = {
check("kafka.controller:type=ControllerStats,name=AutoLeaderBalanceRateAndTimeMs", check("kafka.controller:type=ControllerStats,name=AutoLeaderBalanceRateAndTimeMs",
AutoPreferredReplicaLeaderElection, () => Unit) AutoPreferredReplicaLeaderElection, () => ())
} }
@Test @Test

View File

@ -37,6 +37,7 @@ import org.scalatest.Assertions.fail
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
import scala.collection.Seq
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
class ControllerIntegrationTest extends ZooKeeperTestHarness { class ControllerIntegrationTest extends ZooKeeperTestHarness {

View File

@ -21,7 +21,7 @@ import kafka.common.StateChangeFailedException
import kafka.controller.Election._ import kafka.controller.Election._
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import scala.collection.{breakOut, mutable} import scala.collection.{Seq, mutable}
class MockPartitionStateMachine(controllerContext: ControllerContext, class MockPartitionStateMachine(controllerContext: ControllerContext,
uncleanLeaderElectionEnabled: Boolean) uncleanLeaderElectionEnabled: Boolean)
@ -111,7 +111,7 @@ class MockPartitionStateMachine(controllerContext: ControllerContext,
} }
partition -> value partition -> value
}(breakOut) }.toMap
results ++ failedElections results ++ failedElections
} }

View File

@ -16,6 +16,8 @@
*/ */
package kafka.controller package kafka.controller
import scala.collection.Seq
class MockReplicaStateMachine(controllerContext: ControllerContext) extends ReplicaStateMachine(controllerContext) { class MockReplicaStateMachine(controllerContext: ControllerContext) extends ReplicaStateMachine(controllerContext) {
override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = { override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {

View File

@ -30,7 +30,6 @@ import org.easymock.EasyMock
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{Before, Test} import org.junit.{Before, Test}
import org.mockito.Mockito import org.mockito.Mockito
import scala.collection.breakOut
class PartitionStateMachineTest { class PartitionStateMachineTest {
private var controllerContext: ControllerContext = null private var controllerContext: ControllerContext = null
@ -431,9 +430,9 @@ class PartitionStateMachineTest {
def prepareMockToUpdateLeaderAndIsr(): Unit = { def prepareMockToUpdateLeaderAndIsr(): Unit = {
val updatedLeaderAndIsr: Map[TopicPartition, LeaderAndIsr] = partitions.map { partition => val updatedLeaderAndIsr: Map[TopicPartition, LeaderAndIsr] = partitions.map { partition =>
partition -> leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId)) partition -> leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId))
}(breakOut) }.toMap
EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr, controllerEpoch, controllerContext.epochZkVersion)) EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr, controllerEpoch, controllerContext.epochZkVersion))
.andReturn(UpdateLeaderAndIsrResult(updatedLeaderAndIsr.mapValues(Right(_)), Seq.empty)) .andReturn(UpdateLeaderAndIsrResult(updatedLeaderAndIsr.mapValues(Right(_)).toMap, Seq.empty))
} }
prepareMockToUpdateLeaderAndIsr() prepareMockToUpdateLeaderAndIsr()
} }

View File

@ -104,7 +104,7 @@ class GroupMetadataManagerTest {
) )
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, offsetCommitRecords: _*) val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, offsetCommitRecords.toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
EasyMock.replay(replicaManager) EasyMock.replay(replicaManager)
@ -135,7 +135,7 @@ class GroupMetadataManagerTest {
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType) val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
offsetCommitRecords ++ Seq(groupMetadataRecord): _*) (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@ -566,7 +566,7 @@ class GroupMetadataManagerTest {
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val tombstone = new SimpleRecord(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition), null) val tombstone = new SimpleRecord(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition), null)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
offsetCommitRecords ++ Seq(tombstone): _*) (offsetCommitRecords ++ Seq(tombstone)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@ -603,7 +603,7 @@ class GroupMetadataManagerTest {
val memberId = "98098230493" val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId) val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
offsetCommitRecords ++ Seq(groupMetadataRecord): _*) (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@ -635,7 +635,7 @@ class GroupMetadataManagerTest {
protocolType = "consumer", protocol = "range", memberId) protocolType = "consumer", protocol = "range", memberId)
val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null) val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
Seq(groupMetadataRecord, groupMetadataTombstone): _*) Seq(groupMetadataRecord, groupMetadataTombstone).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@ -664,7 +664,7 @@ class GroupMetadataManagerTest {
val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15, val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
protocolType = "consumer", protocol = "range", memberId, assignmentSize) protocolType = "consumer", protocol = "range", memberId, assignmentSize)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
offsetCommitRecords ++ Seq(groupMetadataRecord): _*) (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@ -700,7 +700,7 @@ class GroupMetadataManagerTest {
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId) val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null) val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords: _*) (Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@ -734,15 +734,15 @@ class GroupMetadataManagerTest {
val segment1MemberId = "a" val segment1MemberId = "a"
val segment1Offsets = Map(tp0 -> 23L, tp1 -> 455L, tp3 -> 42L) val segment1Offsets = Map(tp0 -> 23L, tp1 -> 455L, tp3 -> 42L)
val segment1Records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, val segment1Records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember( (createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(
generation, protocolType, protocol, segment1MemberId)): _*) generation, protocolType, protocol, segment1MemberId))).toArray: _*)
val segment1End = expectGroupMetadataLoad(logMock, startOffset, segment1Records) val segment1End = expectGroupMetadataLoad(logMock, startOffset, segment1Records)
val segment2MemberId = "b" val segment2MemberId = "b"
val segment2Offsets = Map(tp0 -> 33L, tp2 -> 8992L, tp3 -> 10L) val segment2Offsets = Map(tp0 -> 33L, tp2 -> 8992L, tp3 -> 10L)
val segment2Records = MemoryRecords.withRecords(segment1End, CompressionType.NONE, val segment2Records = MemoryRecords.withRecords(segment1End, CompressionType.NONE,
createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember( (createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(
generation, protocolType, protocol, segment2MemberId)): _*) generation, protocolType, protocol, segment2MemberId))).toArray: _*)
val segment2End = expectGroupMetadataLoad(logMock, segment1End, segment2Records) val segment2End = expectGroupMetadataLoad(logMock, segment1End, segment2Records)
EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(segment2End)) EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(segment2End))
@ -1686,7 +1686,7 @@ class GroupMetadataManagerTest {
val memberId = "98098230493" val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion) val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
offsetCommitRecords ++ Seq(groupMetadataRecord): _*) (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@ -1726,7 +1726,7 @@ class GroupMetadataManagerTest {
val memberId = "98098230493" val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId) val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
offsetCommitRecords ++ Seq(groupMetadataRecord): _*) (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
@ -1856,7 +1856,7 @@ class GroupMetadataManagerTest {
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType) val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
offsetCommitRecords ++ Seq(groupMetadataRecord): _*) (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
// Prepend empty control batch to valid records // Prepend empty control batch to valid records
val mockBatch: MutableRecordBatch = EasyMock.createMock(classOf[MutableRecordBatch]) val mockBatch: MutableRecordBatch = EasyMock.createMock(classOf[MutableRecordBatch])

View File

@ -251,7 +251,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
val topicPartition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) val topicPartition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId)
val startOffset = replicaManager.getLogEndOffset(topicPartition).getOrElse(20L) val startOffset = replicaManager.getLogEndOffset(topicPartition).getOrElse(20L)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecordsByPartition(partitionId): _*) val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecordsByPartition(partitionId).toArray: _*)
val endOffset = startOffset + records.records.asScala.size val endOffset = startOffset + records.records.asScala.size
EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset) EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)

View File

@ -156,7 +156,7 @@ class TransactionStateManagerTest {
txnRecords += new SimpleRecord(txnMessageKeyBytes2, TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit())) txnRecords += new SimpleRecord(txnMessageKeyBytes2, TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit()))
val startOffset = 15L // it should work for any start offset val startOffset = 15L // it should work for any start offset
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords: _*) val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords.toArray: _*)
prepareTxnLog(topicPartition, startOffset, records) prepareTxnLog(topicPartition, startOffset, records)
@ -538,7 +538,7 @@ class TransactionStateManagerTest {
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit())) txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
val startOffset = 0L val startOffset = 0L
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords: _*) val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords.toArray: _*)
prepareTxnLog(topicPartition, 0, records) prepareTxnLog(topicPartition, 0, records)

View File

@ -26,6 +26,7 @@ import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.junit.{After, Before} import org.junit.{After, Before}
import scala.collection.Seq
import scala.collection.mutable.{ArrayBuffer, Buffer} import scala.collection.mutable.{ArrayBuffer, Buffer}
import java.util.Properties import java.util.Properties

View File

@ -22,6 +22,7 @@ import org.junit.{After, Before, Test}
import scala.util.Random import scala.util.Random
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Seq
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import java.util.Properties import java.util.Properties
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException

View File

@ -33,7 +33,6 @@ import org.junit.runner.RunWith
import org.junit.runners.Parameterized import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters import org.junit.runners.Parameterized.Parameters
import scala.Seq
import scala.collection._ import scala.collection._
/** /**

View File

@ -1283,7 +1283,7 @@ class LogCleanerTest {
val numSegmentsInitial = log.logSegments.size val numSegmentsInitial = log.logSegments.size
val allKeys = LogTest.keysInLog(log).toList val allKeys = LogTest.keysInLog(log).toList
val expectedKeysAfterCleaning = mutable.MutableList[Long]() val expectedKeysAfterCleaning = new mutable.ArrayBuffer[Long]()
// pretend we want to clean every alternate key // pretend we want to clean every alternate key
val offsetMap = new FakeOffsetMap(Int.MaxValue) val offsetMap = new FakeOffsetMap(Int.MaxValue)
@ -1642,9 +1642,9 @@ class LogCleanerTest {
new SimpleRecord(time.milliseconds(), keyBytes, keyBytes) // the value doesn't matter since we validate offsets new SimpleRecord(time.milliseconds(), keyBytes, keyBytes) // the value doesn't matter since we validate offsets
} }
val records = if (isTransactional) val records = if (isTransactional)
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*) MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords.toArray: _*)
else else
MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*) MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords.toArray: _*)
sequence += simpleRecords.size sequence += simpleRecords.size
log.appendAsLeader(records, leaderEpoch, isFromClient) log.appendAsLeader(records, leaderEpoch, isFromClient)
} }

View File

@ -807,8 +807,7 @@ class SocketServerTest {
def requestMetricMeters = YammerMetrics def requestMetricMeters = YammerMetrics
.defaultRegistry .defaultRegistry
.allMetrics.asScala .allMetrics.asScala
.filterKeys(k => k.getType == "RequestMetrics") .collect { case (k, metric: Meter) if k.getType == "RequestMetrics" => (k.toString, metric.count) }
.collect { case (k, metric: Meter) => (k.toString, metric.count) }
assertEquals(nonZeroMeters, requestMetricMeters.filter { case (_, value) => value != 0 }) assertEquals(nonZeroMeters, requestMetricMeters.filter { case (_, value) => value != 0 })
server.shutdown() server.shutdown()
@ -822,7 +821,7 @@ class SocketServerTest {
val nonZeroMetricNamesAndValues = YammerMetrics val nonZeroMetricNamesAndValues = YammerMetrics
.defaultRegistry .defaultRegistry
.allMetrics.asScala .allMetrics.asScala
.filterKeys(k => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent")) .filter { case (k, _) => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent") }
.collect { case (k, metric: Gauge[_]) => (k, metric.value().asInstanceOf[Double]) } .collect { case (k, metric: Gauge[_]) => (k, metric.value().asInstanceOf[Double]) }
.filter { case (_, value) => value != 0.0 && !value.equals(Double.NaN) } .filter { case (_, value) => value != 0.0 && !value.equals(Double.NaN) }
@ -1068,7 +1067,7 @@ class SocketServerTest {
testableSelector.operationCounts.clear() testableSelector.operationCounts.clear()
testableSelector.addFailure(SelectorOperation.Poll, testableSelector.addFailure(SelectorOperation.Poll,
Some(new RuntimeException("ControlThrowable exception during poll()") with ControlThrowable)) Some(new ControlThrowable() {}))
testableSelector.waitForOperations(SelectorOperation.Poll, 1) testableSelector.waitForOperations(SelectorOperation.Poll, 1)
testableSelector.waitForOperations(SelectorOperation.CloseSelector, 1) testableSelector.waitForOperations(SelectorOperation.CloseSelector, 1)
@ -1086,8 +1085,10 @@ class SocketServerTest {
val errors = new mutable.HashSet[String] val errors = new mutable.HashSet[String]
def acceptorStackTraces: scala.collection.Map[Thread, String] = { def acceptorStackTraces: scala.collection.Map[Thread, String] = {
Thread.getAllStackTraces.asScala.filterKeys(_.getName.contains("kafka-socket-acceptor")) Thread.getAllStackTraces.asScala.collect {
.mapValues(_.toList.mkString("\n")) case (thread, stacktraceElement) if thread.getName.contains("kafka-socket-acceptor") =>
thread -> stacktraceElement.mkString("\n")
}
} }
def acceptorBlocked: Boolean = { def acceptorBlocked: Boolean = {
@ -1247,7 +1248,7 @@ class SocketServerTest {
extends Selector(config.socketRequestMaxBytes, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, extends Selector(config.socketRequestMaxBytes, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
metrics, time, "socket-server", metricTags.asJava, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) { metrics, time, "socket-server", metricTags.asJava, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) {
val failures = mutable.Map[SelectorOperation, Exception]() val failures = mutable.Map[SelectorOperation, Throwable]()
val operationCounts = mutable.Map[SelectorOperation, Int]().withDefaultValue(0) val operationCounts = mutable.Map[SelectorOperation, Int]().withDefaultValue(0)
val allChannels = mutable.Set[String]() val allChannels = mutable.Set[String]()
val allLocallyClosedChannels = mutable.Set[String]() val allLocallyClosedChannels = mutable.Set[String]()
@ -1283,7 +1284,7 @@ class SocketServerTest {
@volatile var pollTimeoutOverride: Option[Long] = None @volatile var pollTimeoutOverride: Option[Long] = None
@volatile var pollCallback: () => Unit = () => {} @volatile var pollCallback: () => Unit = () => {}
def addFailure(operation: SelectorOperation, exception: Option[Exception] = None) { def addFailure(operation: SelectorOperation, exception: Option[Throwable] = None) {
failures += operation -> failures += operation ->
exception.getOrElse(new IllegalStateException(s"Test exception during $operation")) exception.getOrElse(new IllegalStateException(s"Test exception during $operation"))
} }

View File

@ -31,7 +31,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
val brokerId = 0 val brokerId = 0
@After @After
override def tearDown() { override def tearDown(): Unit = {
TestUtils.shutdownServers(servers) TestUtils.shutdownServers(servers)
super.tearDown() super.tearDown()
} }

View File

@ -38,7 +38,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
val topic = "topic" val topic = "topic"
@Test @Test
def testAlterReplicaLogDirsRequest() { def testAlterReplicaLogDirsRequest(): Unit = {
val partitionNum = 5 val partitionNum = 5
// Alter replica dir before topic creation // Alter replica dir before topic creation

View File

@ -22,6 +22,8 @@ import java.net.Socket
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.Properties import java.util.Properties
import scala.collection.Seq
import kafka.api.IntegrationTestHarness import kafka.api.IntegrationTestHarness
import kafka.network.SocketServer import kafka.network.SocketServer
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName

View File

@ -146,7 +146,7 @@ object CreateTopicsRequestWithPolicyTest {
require(replicationFactor == null, s"replicationFactor should be null, but it is $replicationFactor") require(replicationFactor == null, s"replicationFactor should be null, but it is $replicationFactor")
require(replicasAssignments != null, s"replicaAssigments should not be null, but it is $replicasAssignments") require(replicasAssignments != null, s"replicaAssigments should not be null, but it is $replicasAssignments")
replicasAssignments.asScala.foreach { case (partitionId, assignment) => replicasAssignments.asScala.toSeq.sortBy { case (tp, _) => tp }.foreach { case (partitionId, assignment) =>
if (assignment.size < 2) if (assignment.size < 2)
throw new PolicyViolationException("Topic partitions should have at least 2 partitions, received " + throw new PolicyViolationException("Topic partitions should have at least 2 partitions, received " +
s"${assignment.size} for partition $partitionId") s"${assignment.size} for partition $partitionId")

View File

@ -31,7 +31,7 @@ import kafka.admin.AdminOperationException
import kafka.zk.ConfigEntityChangeNotificationZNode import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import scala.collection.Map import scala.collection.{Map, Seq}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
class DynamicConfigChangeTest extends KafkaServerTestHarness { class DynamicConfigChangeTest extends KafkaServerTestHarness {

View File

@ -44,7 +44,7 @@ class FetchRequestTest extends BaseRequestTest {
private var producer: KafkaProducer[String, String] = null private var producer: KafkaProducer[String, String] = null
override def tearDown() { override def tearDown(): Unit = {
if (producer != null) if (producer != null)
producer.close() producer.close()
super.tearDown() super.tearDown()

View File

@ -30,6 +30,7 @@ import org.easymock.EasyMock
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{After, Before, Test} import org.junit.{After, Before, Test}
import scala.collection.Seq
import scala.collection.mutable.{HashMap, Map} import scala.collection.mutable.{HashMap, Map}
class IsrExpirationTest { class IsrExpirationTest {

View File

@ -54,7 +54,7 @@ import org.junit.Assert.{assertEquals, assertNull, assertTrue}
import org.junit.{After, Test} import org.junit.{After, Test}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Map import scala.collection.{Map, Seq}
class KafkaApisTest { class KafkaApisTest {

View File

@ -18,6 +18,8 @@ package kafka.server
import java.util.Properties import java.util.Properties
import scala.collection.Seq
import kafka.utils.TestUtils import kafka.utils.TestUtils
import TestUtils._ import TestUtils._
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
@ -73,7 +75,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
} }
@Before @Before
override def setUp() { override def setUp(): Unit = {
super.setUp() super.setUp()
configs = TestUtils.createBrokerConfigs(2, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) configs = TestUtils.createBrokerConfigs(2, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
@ -91,7 +93,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
} }
@After @After
override def tearDown() { override def tearDown(): Unit = {
producer.close() producer.close()
TestUtils.shutdownServers(servers) TestUtils.shutdownServers(servers)
super.tearDown() super.tearDown()
@ -238,7 +240,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L)) assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
} }
private def sendMessages(n: Int) { private def sendMessages(n: Int): Unit = {
(0 until n).map(_ => producer.send(new ProducerRecord(topic, 0, message))).foreach(_.get) (0 until n).map(_ => producer.send(new ProducerRecord(topic, 0, message))).foreach(_.get)
} }
} }

View File

@ -31,6 +31,7 @@ import org.junit.{Before, Test}
import org.apache.kafka.test.TestUtils.isValidClusterId import org.apache.kafka.test.TestUtils.isValidClusterId
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Seq
class MetadataRequestTest extends BaseRequestTest { class MetadataRequestTest extends BaseRequestTest {

Some files were not shown because too many files have changed in this diff Show More