diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 42c72198a03..bdff518b732 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -12,13 +12,6 @@ */ package org.apache.kafka.clients.consumer; -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; -import static org.apache.kafka.common.config.ConfigDef.ValidString.in; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceCallback; import org.apache.kafka.common.config.AbstractConfig; @@ -27,6 +20,13 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.serialization.Deserializer; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.ValidString.in; + /** * The consumer configuration keys */ @@ -304,7 +304,7 @@ public class ConsumerConfig extends AbstractConfig { return newProperties; } - ConsumerConfig(Map props) { + ConsumerConfig(Map props) { super(CONFIG, props); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 305ec8ee2b9..1ca75f83d36 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -12,15 +12,15 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.AbstractIterator; + import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.AbstractIterator; - /** * A container that holds the list {@link ConsumerRecord} per partition for a * particular topic. There is one for every topic returned by a @@ -55,7 +55,7 @@ public class ConsumerRecords implements Iterable> { throw new IllegalArgumentException("Topic must be non-null."); List>> recs = new ArrayList>>(); for (Map.Entry>> entry : records.entrySet()) { - if (entry.getKey().equals(topic)) + if (entry.getKey().topic().equals(topic)) recs.add(entry.getValue()); } return new ConcatenatedIterable(recs); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 5a575553d30..187d0004c8c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -242,7 +242,7 @@ public class ProducerConfig extends AbstractConfig { return newProperties; } - ProducerConfig(Map props) { + ProducerConfig(Map props) { super(CONFIG, props); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java index cf91f5f9066..c571b4b7176 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java @@ -58,7 +58,7 @@ public class Histogram { @Override public String toString() { - StringBuilder b = new StringBuilder('{'); + StringBuilder b = new StringBuilder("{"); for (int i = 0; i < this.hist.length - 1; i++) { b.append(String.format("%.10f", binScheme.fromBin(i))); b.append(':'); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java index d3394ee669e..dab1a94dd29 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java @@ -49,7 +49,7 @@ public enum SecurityProtocol { } public static String getName(int id) { - return CODE_TO_SECURITY_PROTOCOL.get(id).name; + return CODE_TO_SECURITY_PROTOCOL.get((short) id).name; } public static List getNames() { diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index 66442ed3853..db1b0ee9113 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -12,16 +12,16 @@ */ package org.apache.kafka.common.config; -import static org.junit.Assert.fail; - -import java.util.Map; -import java.util.Properties; - import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.metrics.MetricsReporter; import org.junit.Test; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.fail; + public class AbstractConfigTest { @Test @@ -73,7 +73,7 @@ public class AbstractConfigTest { METRIC_REPORTER_CLASSES_DOC); } - public TestConfig(Map props) { + public TestConfig(Map props) { super(CONFIG, props); } } diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java index 8b926340de4..6c335a12024 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java @@ -43,8 +43,8 @@ public class ProtocolSerializationTest { new Field("struct", new Schema(new Field("field", Type.INT32)))); this.struct = new Struct(this.schema).set("int8", (byte) 1) .set("int16", (short) 1) - .set("int32", (int) 1) - .set("int64", (long) 1) + .set("int32", 1) + .set("int64", 1L) .set("string", "1") .set("bytes", "1".getBytes()) .set("array", new Object[] {1}); diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index 1d0e0a91798..c9b90189884 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -159,7 +159,7 @@ public class KafkaETLContext { _response = _consumer.fetch(fetchRequest); if(_response != null) { _respIterator = new ArrayList(){{ - add((ByteBufferMessageSet) _response.messageSet(_request.getTopic(), _request.getPartition())); + add(_response.messageSet(_request.getTopic(), _request.getPartition())); }}.iterator(); } _requestTime += (System.currentTimeMillis() - tempTime); diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index bbe3362b2b0..acaa6112db9 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -127,7 +127,7 @@ object ReassignPartitionsCommand extends Logging { } val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap) // before starting assignment, output the current replica assignment to facilitate rollback - val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) + val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic)) println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback" .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) // start the reassignment diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 60f0228e673..8e6f18633b2 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -143,7 +143,7 @@ object TopicCommand { topics.foreach { topic => try { if (Topic.InternalTopics.contains(topic)) { - throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic)); + throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic)) } else { ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) println("Topic %s is marked for deletion.".format(topic)) diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index 5be393ab827..fe81635c864 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -37,9 +37,9 @@ object ControlledShutdownRequest extends Logging { } } -case class ControlledShutdownRequest(val versionId: Short, - val correlationId: Int, - val brokerId: Int) +case class ControlledShutdownRequest(versionId: Short, + correlationId: Int, + brokerId: Int) extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){ def this(correlationId: Int, brokerId: Int) = @@ -74,4 +74,4 @@ case class ControlledShutdownRequest(val versionId: Short, controlledShutdownRequest.append("; BrokerId: " + brokerId) controlledShutdownRequest.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala index 5e0a1cf4f40..9ecdee73c17 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala @@ -39,9 +39,9 @@ object ControlledShutdownResponse { } -case class ControlledShutdownResponse(val correlationId: Int, - val errorCode: Short = ErrorMapping.NoError, - val partitionsRemaining: Set[TopicAndPartition]) +case class ControlledShutdownResponse(correlationId: Int, + errorCode: Short = ErrorMapping.NoError, + partitionsRemaining: Set[TopicAndPartition]) extends RequestOrResponse() { def sizeInBytes(): Int ={ var size = @@ -68,4 +68,4 @@ case class ControlledShutdownResponse(val correlationId: Int, override def describe(details: Boolean):String = { toString } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala index f40e19f4b2a..b0c6d7a3398 100644 --- a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala +++ b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala @@ -36,7 +36,7 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short, 2 /* version id */ + 4 /* correlation id */ + (2 + clientId.length) /* client id */ + - body.sizeOf(); + body.sizeOf() } override def toString(): String = { @@ -52,4 +52,4 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short, strBuffer.append("; Body: " + body.toString) strBuffer.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala index a4879e26b53..748b5e93523 100644 --- a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala +++ b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala @@ -29,7 +29,7 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int, def sizeInBytes(): Int = { 4 /* correlation id */ + - body.sizeOf(); + body.sizeOf() } override def toString(): String = { @@ -43,4 +43,4 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int, strBuffer.append("; Body: " + body.toString) strBuffer.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 2fad585f126..431190ab94a 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -59,8 +59,8 @@ object PartitionStateInfo { } } -case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, - val allReplicas: Set[Int]) { +case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, + allReplicas: Set[Int]) { def replicationFactor = allReplicas.size def writeTo(buffer: ByteBuffer) { @@ -200,4 +200,4 @@ case class LeaderAndIsrRequest (versionId: Short, leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(",")) leaderAndIsrRequest.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala index 3431f3f65d1..2fc3c9585fb 100644 --- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala +++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala @@ -42,9 +42,9 @@ object StopReplicaResponse { } -case class StopReplicaResponse(val correlationId: Int, - val responseMap: Map[TopicAndPartition, Short], - val errorCode: Short = ErrorMapping.NoError) +case class StopReplicaResponse(correlationId: Int, + responseMap: Map[TopicAndPartition, Short], + errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse() { def sizeInBytes(): Int ={ var size = @@ -72,4 +72,4 @@ case class StopReplicaResponse(val correlationId: Int, } override def describe(details: Boolean):String = { toString } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index 5e39f453b42..bd866bc8d65 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -109,7 +109,7 @@ object PartitionMetadata { } case class PartitionMetadata(partitionId: Int, - val leader: Option[BrokerEndPoint], + leader: Option[BrokerEndPoint], replicas: Seq[BrokerEndPoint], isr: Seq[BrokerEndPoint] = Seq.empty, errorCode: Short = ErrorMapping.NoError) extends Logging { diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 7dca09ce637..363bae01752 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -46,10 +46,10 @@ object TopicMetadataRequest extends Logging { } } -case class TopicMetadataRequest(val versionId: Short, - val correlationId: Int, - val clientId: String, - val topics: Seq[String]) +case class TopicMetadataRequest(versionId: Short, + correlationId: Int, + clientId: String, + topics: Seq[String]) extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ def this(topics: Seq[String], correlationId: Int) = @@ -93,4 +93,4 @@ case class TopicMetadataRequest(val versionId: Short, topicMetadataRequest.append("; Topics: " + topics.mkString(",")) topicMetadataRequest.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index b66424b2304..62394c0d381 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -74,7 +74,7 @@ object ClientUtils extends Logging{ } else { debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) } - return topicMetadataResponse + topicMetadataResponse } /** diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala index 3286f6d4f25..e9008e6d758 100644 --- a/core/src/main/scala/kafka/cluster/EndPoint.scala +++ b/core/src/main/scala/kafka/cluster/EndPoint.scala @@ -68,7 +68,7 @@ case class EndPoint(host: String, port: Int, protocolType: SecurityProtocol) { def writeTo(buffer: ByteBuffer): Unit = { buffer.putInt(port) writeShortString(buffer, host) - buffer.putShort(protocolType.id.toShort) + buffer.putShort(protocolType.id) } def sizeInBytes: Int = diff --git a/core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala b/core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala deleted file mode 100644 index ae5018daa97..00000000000 --- a/core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Thrown when a request is made for broker but no brokers with that topic - * exist. - */ -class ConsumerRebalanceFailedException(message: String) extends RuntimeException(message) { - def this() = this(null) -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 97acdb23f6e..1b223101e2d 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -309,8 +309,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } updateMetadataRequestMap.clear() stopReplicaRequestMap foreach { case(broker, replicaInfoList) => - val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet - val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet + val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet + val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet debug("The stop replica request (delete = true) sent to broker %d is %s" .format(broker, stopReplicaWithDelete.mkString(","))) debug("The stop replica request (delete = false) sent to broker %d is %s" diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 3a09377611b..a6351163f5b 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -647,7 +647,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt */ def startup() = { inLock(controllerContext.controllerLock) { - info("Controller starting up"); + info("Controller starting up") registerSessionExpirationListener() isRunning = true controllerElector.startup @@ -1326,7 +1326,7 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int) { } } -case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) { +case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) { override def toString(): String = { val leaderAndIsrInfo = new StringBuilder leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index e5c56e0618a..3a44fdc4e6b 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -282,7 +282,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val replicasForTopic = controller.controllerContext.replicasForTopic(topic) val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap debug("Are all replicas for topic %s deleted %s".format(topic, replicaStatesForTopic)) - replicaStatesForTopic.foldLeft(true)((deletionState, r) => deletionState && r._2 == ReplicaDeletionSuccessful) + replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful) } def isAtLeastOneReplicaInDeletionStartedState(topic: String): Boolean = { diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala index 8defa2e41c9..60fbdae164f 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala @@ -41,8 +41,7 @@ class DelayedRebalance(sessionTimeout: Long, /* check if all known consumers have requested to re-join group */ override def tryComplete(): Boolean = { - allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft - (true) ((agg, cur) => agg && cur.joinGroupReceived.get())) + allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.forall(_.joinGroupReceived.get())) if (allConsumersJoinedGroup.get()) forceComplete() diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala index 8baf4d46802..c69b0a3c3d8 100644 --- a/core/src/main/scala/kafka/javaapi/Implicits.scala +++ b/core/src/main/scala/kafka/javaapi/Implicits.scala @@ -47,9 +47,4 @@ private[javaapi] object Implicits extends Logging { } } - // used explicitly by ByteBufferMessageSet constructor as due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors - implicit def javaListToScalaBuffer[A](l: java.util.List[A]) = { - import scala.collection.JavaConversions._ - l: collection.mutable.Buffer[A] - } } diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index 0125565c84a..df3027958af 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -19,14 +19,14 @@ package kafka.javaapi.message import java.util.concurrent.atomic.AtomicLong import java.nio.ByteBuffer import kafka.message._ -import kafka.javaapi.Implicits.javaListToScalaBuffer + +import scala.collection.JavaConverters._ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet { private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer) def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - // due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors and must be used explicitly - this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), javaListToScalaBuffer(messages).toSeq : _*).buffer) + this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), messages.asScala: _*).buffer) } def this(messages: java.util.List[Message]) { diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala index ade838672d6..782bc9adb22 100644 --- a/core/src/main/scala/kafka/log/CleanerConfig.scala +++ b/core/src/main/scala/kafka/log/CleanerConfig.scala @@ -29,13 +29,13 @@ package kafka.log * @param enableCleaner Allows completely disabling the log cleaner * @param hashAlgorithm The hash algorithm to use in key comparison. */ -case class CleanerConfig(val numThreads: Int = 1, - val dedupeBufferSize: Long = 4*1024*1024L, - val dedupeBufferLoadFactor: Double = 0.9d, - val ioBufferSize: Int = 1024*1024, - val maxMessageSize: Int = 32*1024*1024, - val maxIoBytesPerSecond: Double = Double.MaxValue, - val backOffMs: Long = 15 * 1000, - val enableCleaner: Boolean = true, - val hashAlgorithm: String = "MD5") { -} \ No newline at end of file +case class CleanerConfig(numThreads: Int = 1, + dedupeBufferSize: Long = 4*1024*1024L, + dedupeBufferLoadFactor: Double = 0.9d, + ioBufferSize: Int = 1024*1024, + maxMessageSize: Int = 32*1024*1024, + maxIoBytesPerSecond: Double = Double.MaxValue, + backOffMs: Long = 15 * 1000, + enableCleaner: Boolean = true, + hashAlgorithm: String = "MD5") { +} diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 12eacdfa7b5..abea8b25189 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -72,7 +72,7 @@ class LogCleaner(val config: CleanerConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { /* for managing the state of partitions being cleaned. package-private to allow access in tests */ - private[log] val cleanerManager = new LogCleanerManager(logDirs, logs); + private[log] val cleanerManager = new LogCleanerManager(logDirs, logs) /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, @@ -622,4 +622,4 @@ private case class LogToClean(topicPartition: TopicAndPartition, log: Log, first val cleanableRatio = dirtyBytes / totalBytes.toDouble def totalBytes = cleanBytes + dirtyBytes override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index da55a348f37..a907da09e1c 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -63,23 +63,23 @@ object Defaults { * @param compressionType compressionType for a given topic * */ -case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, - val segmentMs: Long = Defaults.SegmentMs, - val segmentJitterMs: Long = Defaults.SegmentJitterMs, - val flushInterval: Long = Defaults.FlushInterval, - val flushMs: Long = Defaults.FlushMs, - val retentionSize: Long = Defaults.RetentionSize, - val retentionMs: Long = Defaults.RetentionMs, - val maxMessageSize: Int = Defaults.MaxMessageSize, - val maxIndexSize: Int = Defaults.MaxIndexSize, - val indexInterval: Int = Defaults.IndexInterval, - val fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs, - val deleteRetentionMs: Long = Defaults.DeleteRetentionMs, - val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, - val compact: Boolean = Defaults.Compact, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, - val minInSyncReplicas: Int = Defaults.MinInSyncReplicas, - val compressionType: String = Defaults.CompressionType) { +case class LogConfig(segmentSize: Int = Defaults.SegmentSize, + segmentMs: Long = Defaults.SegmentMs, + segmentJitterMs: Long = Defaults.SegmentJitterMs, + flushInterval: Long = Defaults.FlushInterval, + flushMs: Long = Defaults.FlushMs, + retentionSize: Long = Defaults.RetentionSize, + retentionMs: Long = Defaults.RetentionMs, + maxMessageSize: Int = Defaults.MaxMessageSize, + maxIndexSize: Int = Defaults.MaxIndexSize, + indexInterval: Int = Defaults.IndexInterval, + fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs, + deleteRetentionMs: Long = Defaults.DeleteRetentionMs, + minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, + compact: Boolean = Defaults.Compact, + uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, + minInSyncReplicas: Int = Defaults.MinInSyncReplicas, + compressionType: String = Defaults.CompressionType) { def toProps: Properties = { val props = new Properties() diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 4ab22deec99..a1082aefd60 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -375,10 +375,10 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi if(Os.isWindows) lock.lock() try { - return fun + fun } finally { if(Os.isWindows) lock.unlock() } } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala index 2940e47cb88..303aad5ba76 100755 --- a/core/src/main/scala/kafka/log/OffsetMap.scala +++ b/core/src/main/scala/kafka/log/OffsetMap.scala @@ -69,7 +69,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend /** * The maximum number of entries this map can contain */ - val slots: Int = (memory / bytesPerEntry).toInt + val slots: Int = memory / bytesPerEntry /** * Associate this offset to the given key. @@ -177,4 +177,4 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend digest.digest(buffer, 0, hashSize) } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/log/OffsetPosition.scala b/core/src/main/scala/kafka/log/OffsetPosition.scala index 6cefde4d90e..24b6dcf0bb7 100644 --- a/core/src/main/scala/kafka/log/OffsetPosition.scala +++ b/core/src/main/scala/kafka/log/OffsetPosition.scala @@ -22,4 +22,4 @@ package kafka.log * in some log file of the beginning of the message set entry with the * given offset. */ -case class OffsetPosition(val offset: Long, val position: Int) \ No newline at end of file +case class OffsetPosition(offset: Long, position: Int) diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index f1b8432f4a9..28b56e68cfd 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -43,7 +43,7 @@ object MessageSet { var size = 0 val iter = messages.iterator while(iter.hasNext) { - val message = iter.next.asInstanceOf[Message] + val message = iter.next size += entrySize(message) } size diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala index 30fd0ea3ce0..0d6da3466fd 100755 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala @@ -52,7 +52,7 @@ object KafkaMetricsReporter { def startReporters (verifiableProps: VerifiableProperties) { ReporterStarted synchronized { - if (ReporterStarted.get() == false) { + if (!ReporterStarted.get()) { val metricsConfig = new KafkaMetricsConfig(verifiableProps) if(metricsConfig.reporters.size > 0) { metricsConfig.reporters.foreach(reporterType => { diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index c5fec000f57..b9bedde336a 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -87,7 +87,7 @@ class SocketServer(val brokerId: Int, quotas, connectionsMaxIdleMs, portToProtocol) - Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start(); + Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start() } } @@ -244,7 +244,7 @@ private[kafka] class Acceptor(val host: String, * Accept loop that checks for new connection attempts */ def run() { - serverChannel.register(selector, SelectionKey.OP_ACCEPT); + serverChannel.register(selector, SelectionKey.OP_ACCEPT) startupComplete() var currentProcessor = 0 while(isRunning) { @@ -480,7 +480,7 @@ private[kafka] class Processor(val id: Int, key.attach(receive) } val read = receive.readFrom(socketChannel) - val address = socketChannel.socket.getRemoteSocketAddress(); + val address = socketChannel.socket.getRemoteSocketAddress() trace(read + " bytes read from " + address) if(read < 0) { close(key) diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala index 388bc9bbd9a..dbcf29515bb 100644 --- a/core/src/main/scala/kafka/producer/KeyedMessage.scala +++ b/core/src/main/scala/kafka/producer/KeyedMessage.scala @@ -21,7 +21,7 @@ package kafka.producer * A topic, key, and value. * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored. */ -case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) { +case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) { if(topic == null) throw new IllegalArgumentException("Topic cannot be null.") @@ -39,4 +39,4 @@ case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, v } def hasKey = key != null -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala index 01e8f72b442..6e8d68dfae2 100755 --- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -72,7 +72,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging { } } catch { case e: FileNotFoundException => - warn("No meta.properties file under dir %s".format(file.getAbsolutePath(), e.getMessage)) + warn("No meta.properties file under dir %s".format(file.getAbsolutePath())) None case e1: Exception => error("Failed to read meta.properties file under dir %s due to %s".format(file.getAbsolutePath(), e1.getMessage)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c63f4ba9d62..d401bacdcba 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -458,7 +458,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg checkpoint.write(new BrokerMetadata(brokerId)) } - return brokerId + brokerId } private def generateBrokerId: Int = { diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala index a868334e0e5..00b60fe152c 100644 --- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala +++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala @@ -26,7 +26,7 @@ object LogOffsetMetadata { class OffsetOrdering extends Ordering[LogOffsetMetadata] { override def compare(x: LogOffsetMetadata , y: LogOffsetMetadata ): Int = { - return x.offsetDiff(y).toInt + x.offsetDiff(y).toInt } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8ddd325015d..59c9bc3ac3a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -242,7 +242,7 @@ class ReplicaManager(val config: KafkaConfig, def getReplicaOrException(topic: String, partition: Int): Replica = { val replicaOpt = getReplica(topic, partition) if(replicaOpt.isDefined) - return replicaOpt.get + replicaOpt.get else throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId, topic, partition)) } diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 80b26744a79..bba39904c5c 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -69,7 +69,7 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("prop") .ofType(classOf[String]) - val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); + val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up") val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + "start with the earliest message present in the log rather than the latest message.") val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") @@ -209,7 +209,7 @@ object ConsoleConsumer extends Logging { def checkZkPathExists(zkUrl: String, path: String): Boolean = { try { - val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); + val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer) zk.exists(path) } catch { case _: Throwable => false diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 00265f9f4a4..6971e6e4dcc 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -74,7 +74,7 @@ object ConsoleProducer { def getOldProducerProps(config: ProducerConfig): Properties = { - val props = new Properties; + val props = new Properties props.putAll(config.extraProducerProps) @@ -100,7 +100,7 @@ object ConsoleProducer { def getNewProducerProps(config: ProducerConfig): Properties = { - val props = new Properties; + val props = new Properties props.putAll(config.extraProducerProps) diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala index 4d051bc2db1..ce14bbc7f00 100644 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala @@ -114,11 +114,10 @@ object ExportZkOffsets extends Logging { } } - private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = { - return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList - } + private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = + ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList - private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = { - return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList - } + private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = + ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList + } diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index abe09721b13..598350d7181 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -89,7 +89,7 @@ object ImportZkOffsets extends Logging { s = br.readLine() } - return partOffsetsMap + partOffsetsMap } private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = { diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 1d1a120c45f..c2b2030ccfc 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -103,7 +103,7 @@ object JmxTool extends Logging { // print csv header val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted - if(keys.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1) + if(keys.size == numExpectedAttributes.map(_._2).sum + 1) println(keys.map("\"" + _ + "\"").mkString(",")) while(true) { @@ -113,7 +113,7 @@ object JmxTool extends Logging { case Some(dFormat) => dFormat.format(new Date) case None => System.currentTimeMillis().toString } - if(attributes.keySet.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1) + if(attributes.keySet.size == numExpectedAttributes.map(_._2).sum + 1) println(keys.map(attributes(_)).mkString(",")) val sleep = max(0, interval - (System.currentTimeMillis - start)) Thread.sleep(sleep) @@ -137,4 +137,4 @@ object JmxTool extends Logging { attributes } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index bc25cd2f371..71b1bd5e995 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -235,7 +235,7 @@ object ProducerPerformance extends Logging { val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x') debug(seqMsgString) - return seqMsgString.getBytes() + seqMsgString.getBytes() } private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index dec9516c76f..1c2023c0a7e 100755 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -148,7 +148,7 @@ object SimpleConsumerShell extends Logging { if(replicaId == UseLeaderReplica) { replicaOpt = partitionMetadataOpt.get.leader if(!replicaOpt.isDefined) { - System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId)) + System.err.println("Error: user specifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(topic, partitionId)) System.exit(1) } } diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala index aef8361b73a..4fb519bf6d0 100644 --- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala +++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala @@ -79,9 +79,7 @@ object VerifyConsumerRebalance extends Logging { val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics = false) val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keySet.toSeq) - partitionsPerTopicMap.foreach { partitionsForTopic => - val topic = partitionsForTopic._1 - val partitions = partitionsForTopic._2 + partitionsPerTopicMap.foreach { case (topic, partitions) => val topicDirs = new ZKGroupTopicDirs(group, topic) info("Alive partitions for topic %s are %s ".format(topic, partitions.toString)) info("Alive consumers for topic %s => %s ".format(topic, consumersPerTopicMap.get(topic))) @@ -95,8 +93,8 @@ object VerifyConsumerRebalance extends Logging { // for each available partition for topic, check if an owner exists partitions.foreach { partition => - // check if there is a node for [partition] - if(!partitionsWithOwners.exists(p => p.equals(partition))) { + // check if there is a node for [partition] + if(!partitionsWithOwners.contains(partition.toString)) { error("No owner for partition [%s,%d]".format(topic, partition)) rebalanceSucceeded = false } diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index c473a034bc3..98abc4569ed 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -102,7 +102,7 @@ object CoreUtils extends Logging { * Recursively delete the list of files/directories and any subfiles (if any exist) * @param files sequence of files to be deleted */ - def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f))) + def rm(files: Seq[String]): Unit = files.foreach(f => rm(new File(f))) /** * Recursively delete the given file/directory and any subfiles (if any exist) @@ -230,7 +230,7 @@ object CoreUtils extends Logging { def createObject[T<:AnyRef](className: String, args: AnyRef*): T = { val klass = Class.forName(className).asInstanceOf[Class[T]] val constructor = klass.getConstructor(args.map(_.getClass): _*) - constructor.newInstance(args: _*).asInstanceOf[T] + constructor.newInstance(args: _*) } /** diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala index 7417897c591..aa120abc0bc 100644 --- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala +++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala @@ -39,7 +39,7 @@ object Mx4jLoader extends Logging { val address = props.getString("mx4jaddress", "0.0.0.0") val port = props.getInt("mx4jport", 8082) try { - debug("Will try to load MX4j now, if it's in the classpath"); + debug("Will try to load MX4j now, if it's in the classpath") val mbs = ManagementFactory.getPlatformMBeanServer() val processorName = new ObjectName("Server:name=XSLTProcessor") @@ -62,10 +62,10 @@ object Mx4jLoader extends Logging { } catch { case e: ClassNotFoundException => { - info("Will not load MX4J, mx4j-tools.jar is not in the classpath"); + info("Will not load MX4J, mx4j-tools.jar is not in the classpath") } case e: Throwable => { - warn("Could not start register mbean in JMX", e); + warn("Could not start register mbean in JMX", e) } } false diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala index d1a144d7882..998ade14ef8 100644 --- a/core/src/main/scala/kafka/utils/Throttler.scala +++ b/core/src/main/scala/kafka/utils/Throttler.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.utils; +package kafka.utils import kafka.metrics.KafkaMetricsGroup import java.util.concurrent.TimeUnit @@ -95,4 +95,4 @@ object Throttler { } } } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 5685a1eddb2..1da8f90b3a7 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -498,7 +498,7 @@ object ZkUtils extends Logging { try { client.getChildren(path) } catch { - case e: ZkNoNodeException => return Nil + case e: ZkNoNodeException => Nil case e2: Throwable => throw e2 } } @@ -728,21 +728,19 @@ object ZkUtils extends Logging { def getSequenceId(client: ZkClient, path: String): Int = { try { val stat = client.writeDataReturnStat(path, "", -1) - return stat.getVersion + stat.getVersion } catch { case e: ZkNoNodeException => { createParentPath(client, BrokerSequenceIdPath) try { client.createPersistent(BrokerSequenceIdPath, "") - return 0 + 0 } catch { case e: ZkNodeExistsException => val stat = client.writeDataReturnStat(BrokerSequenceIdPath, "", -1) - return stat.getVersion - case e2: Throwable => throw e2 + stat.getVersion } } - case e2: Throwable => throw e2 } } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 35f4f4612c6..5c4cca653b3 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -71,7 +71,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { def consumeWithBrokerFailures(numIters: Int) { val numRecords = 1000 sendRecords(numRecords) - this.producers.map(_.close) + this.producers.foreach(_.close) var consumed = 0 val consumer = this.consumers(0) @@ -100,7 +100,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { def seekAndCommitWithBrokerFailures(numIters: Int) { val numRecords = 1000 sendRecords(numRecords) - this.producers.map(_.close) + this.producers.foreach(_.close) val consumer = this.consumers(0) consumer.subscribe(tp) @@ -151,4 +151,4 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { } futures.map(_.get) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 02d26277b2d..2bbd4c96f8c 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -44,7 +44,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { override def generateConfigs() = { val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect) - cfgs.map(_.putAll(serverConfig)) + cfgs.foreach(_.putAll(serverConfig)) cfgs.map(KafkaConfig.fromProps) } @@ -70,8 +70,8 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { } override def tearDown() { - producers.map(_.close()) - consumers.map(_.close()) + producers.foreach(_.close()) + consumers.foreach(_.close()) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index ab5d16c1d22..df5c6ba20f0 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -55,8 +55,8 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(_.shutdown()) + servers.foreach(server => CoreUtils.rm(server.config.logDirs)) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 4b728a18a43..efb2f8e79b3 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -301,7 +301,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) + val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None).get // trigger preferred replica election @@ -319,7 +319,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partition = 1 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) - val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) + val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) // create the topic TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers) @@ -330,7 +330,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { try { // wait for the update metadata request to trickle to the brokers TestUtils.waitUntilTrue(() => - activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), + activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), "Topic test not created after timeout") assertEquals(0, partitionsRemaining.size) var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get @@ -346,11 +346,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) partitionsRemaining = controller.shutdownBroker(0) assertEquals(1, partitionsRemaining.size) // leader doesn't change since all the replicas are shut down - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) } finally { servers.foreach(_.shutdown()) @@ -397,7 +397,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { checkConfig(2*maxMessageSize, 2 * retentionMs) } finally { server.shutdown() - server.config.logDirs.map(CoreUtils.rm(_)) + server.config.logDirs.foreach(CoreUtils.rm(_)) } } diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 61cc6028dd7..fa8ce259a28 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -54,7 +54,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // check if all replicas but the one that is shut down has deleted the log TestUtils.waitUntilTrue(() => servers.filter(s => s.config.brokerId != follower.config.brokerId) - .foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.") + .forall(_.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.") // ensure topic deletion is halted TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), "Admin path /admin/delete_topic/test path deleted even when a follower replica is down") @@ -104,8 +104,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // wait until replica log is created on every broker - TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created.") val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) @@ -155,7 +154,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) // verify that new partition doesn't exist on any broker either TestUtils.waitUntilTrue(() => - servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty), + servers.forall(_.getLogManager().getLog(newPartition).isEmpty), "Replica logs not for new partition [test,1] not deleted after delete topic is complete.") servers.foreach(_.shutdown()) } @@ -173,7 +172,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) // verify that new partition doesn't exist on any broker either assertTrue("Replica logs not deleted after delete topic is complete", - servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty)) + servers.forall(_.getLogManager().getLog(newPartition).isEmpty)) servers.foreach(_.shutdown()) } @@ -192,7 +191,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) // check if all replica logs are created - TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created.") servers.foreach(_.shutdown()) } @@ -207,8 +206,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // verify delete topic path for test2 is removed from zookeeper TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers) // verify that topic test is untouched - TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created") // test the topic path exists assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) @@ -267,8 +265,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // wait until replica log is created on every broker - TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created") servers } diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 447e4219f84..87c631573aa 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -63,8 +63,8 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => server.config.logDirs.map(CoreUtils.rm(_))) + servers.foreach(_.shutdown()) + servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_))) super.tearDown } diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index 1113619b7a9..12d0733f5ed 100755 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -40,8 +40,8 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(_.shutdown()) + servers.foreach(server => CoreUtils.rm(server.config.logDirs)) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 5b7b529fdff..e4bf2df48dd 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -78,8 +78,8 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => shutdownServer(server)) - servers.map(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(server => shutdownServer(server)) + servers.foreach(server => CoreUtils.rm(server.config.logDirs)) // restore log levels kafkaApisLogger.setLevel(Level.ERROR) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 0a26f5f1549..01dfbc4f8d2 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -48,7 +48,7 @@ class LogManagerTest extends JUnit3Suite { if(logManager != null) logManager.shutdown() CoreUtils.rm(logDir) - logManager.logDirs.map(CoreUtils.rm(_)) + logManager.logDirs.foreach(CoreUtils.rm(_)) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 069aa02ab66..76d3bfd378f 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -107,7 +107,7 @@ class LogTest extends JUnitSuite { time.sleep(log.config.segmentMs - maxJitter) log.append(set) - assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments); + assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments) time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1) log.append(set) assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments) @@ -302,7 +302,7 @@ class LogTest extends JUnitSuite { assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", currOffset, - log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset) + log.append(TestUtils.singleMessageSet("hello".getBytes)).firstOffset) // cleanup the log log.delete() @@ -752,7 +752,7 @@ class LogTest extends JUnitSuite { val topic: String = "test_topic" val partition:String = "143" val dir: File = new File(logDir + topicPartitionName(topic, partition)) - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) assertEquals(topic, topicAndPartition.asTuple._1) assertEquals(partition.toInt, topicAndPartition.asTuple._2) } @@ -761,7 +761,7 @@ class LogTest extends JUnitSuite { def testParseTopicPartitionNameForEmptyName() { try { val dir: File = new File("") - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { case e: Exception => // its GOOD! @@ -772,7 +772,7 @@ class LogTest extends JUnitSuite { def testParseTopicPartitionNameForNull() { try { val dir: File = null - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir) } catch { case e: Exception => // its GOOD! @@ -785,7 +785,7 @@ class LogTest extends JUnitSuite { val partition:String = "1999" val dir: File = new File(logDir + File.separator + topic + partition) try { - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { case e: Exception => // its GOOD! @@ -798,7 +798,7 @@ class LogTest extends JUnitSuite { val partition:String = "1999" val dir: File = new File(logDir + topicPartitionName(topic, partition)) try { - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { case e: Exception => // its GOOD! @@ -811,7 +811,7 @@ class LogTest extends JUnitSuite { val partition:String = "" val dir: File = new File(logDir + topicPartitionName(topic, partition)) try { - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { case e: Exception => // its GOOD! diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 26572f7ff94..f1977d850a5 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -49,8 +49,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(_.shutdown()) + servers.foreach(server => CoreUtils.rm(server.config.logDirs)) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index a67cc37d542..a3a03db88c4 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -65,8 +65,9 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { for (topic <- List(topic1, topic2)) { val topicAndPart = TopicAndPartition(topic, partition) val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset - result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total && - (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset) } + result = result && expectedOffset > 0 && brokers.forall { item => + (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset) + } } result } diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala index 46a4e899ef2..fbd245cad0a 100644 --- a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala +++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala @@ -22,7 +22,7 @@ import org.junit.{Test, After, Before} class IteratorTemplateTest extends Assertions { - val lst = (0 until 10).toSeq + val lst = (0 until 10) val iterator = new IteratorTemplate[Int]() { var i = 0 override def makeNext() = { @@ -54,4 +54,4 @@ class IteratorTemplateTest extends Assertions { } } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala index c6740782813..eeafeda0422 100644 --- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala +++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala @@ -81,10 +81,10 @@ case class MockTask(val name: String, val fun: () => Unit, var nextExecution: Lo def periodic = period >= 0 def compare(t: MockTask): Int = { if(t.nextExecution == nextExecution) - return 0 + 0 else if (t.nextExecution < nextExecution) - return -1 + -1 else - return 1 + 1 } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 8dc99b6b9fd..faae0e90759 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -396,7 +396,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + new KafkaProducer[Array[Byte],Array[Byte]](producerProps) } /** @@ -417,7 +417,7 @@ object TestUtils extends Logging { consumerProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - return new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps) + new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps) } /** @@ -457,9 +457,9 @@ object TestUtils extends Logging { new IteratorTemplate[Message] { override def makeNext(): Message = { if (iter.hasNext) - return iter.next.message + iter.next.message else - return allDone() + allDone() } } } @@ -579,7 +579,7 @@ object TestUtils extends Logging { fail("Timing out after %d ms since leader is not elected or changed for partition [%s,%d]" .format(timeoutMs, topic, partition)) - return leader + leader } /** diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala index d42108eacf7..a2d062f7251 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala @@ -58,7 +58,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { case exception: Throwable => fail("Failed to create persistent path") } - Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)); + Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)) } def testMakeSurePersistsPathExistsThrowsException { @@ -88,7 +88,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { case exception: Throwable => fail("Failed to create persistent path") } - Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)); + Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)) } def testCreateEphemeralPathThrowsException { @@ -118,7 +118,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { case exception: Throwable => fail("Failed to create ephemeral path") } - Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path)); + Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path)) } def testCreatePersistentSequentialThrowsException { @@ -150,6 +150,6 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { case exception: Throwable => fail("Failed to create persistent path") } - Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath)); + Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath)) } } diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index e5096f03bd3..c43b46144e6 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -68,7 +68,7 @@ public class SimpleConsumerDemo { .addFetch(KafkaProperties.topic2, 0, 0L, 100) .build(); FetchResponse fetchResponse = simpleConsumer.fetch(req); - printMessages((ByteBufferMessageSet) fetchResponse.messageSet(KafkaProperties.topic2, 0)); + printMessages(fetchResponse.messageSet(KafkaProperties.topic2, 0)); System.out.println("Testing single multi-fetch"); Map> topicMap = new HashMap>(); @@ -85,7 +85,7 @@ public class SimpleConsumerDemo { String topic = entry.getKey(); for ( Integer offset : entry.getValue()) { System.out.println("Response from fetch request no: " + ++fetchReq); - printMessages((ByteBufferMessageSet) fetchResponse.messageSet(topic, offset)); + printMessages(fetchResponse.messageSet(topic, offset)); } } }