mirror of https://github.com/apache/kafka.git
KAFKA-2140 Improve code readability; reviewed by Neha Narkhede
This commit is contained in:
parent
622e707554
commit
ed1a548c50
|
@ -12,13 +12,6 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer;
|
||||
|
||||
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
||||
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceCallback;
|
||||
import org.apache.kafka.common.config.AbstractConfig;
|
||||
|
@ -27,6 +20,13 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
|
|||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
||||
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
|
||||
|
||||
/**
|
||||
* The consumer configuration keys
|
||||
*/
|
||||
|
@ -304,7 +304,7 @@ public class ConsumerConfig extends AbstractConfig {
|
|||
return newProperties;
|
||||
}
|
||||
|
||||
ConsumerConfig(Map<? extends Object, ? extends Object> props) {
|
||||
ConsumerConfig(Map<?, ?> props) {
|
||||
super(CONFIG, props);
|
||||
}
|
||||
|
||||
|
|
|
@ -12,15 +12,15 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.utils.AbstractIterator;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.utils.AbstractIterator;
|
||||
|
||||
/**
|
||||
* A container that holds the list {@link ConsumerRecord} per partition for a
|
||||
* particular topic. There is one for every topic returned by a
|
||||
|
@ -55,7 +55,7 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
|
|||
throw new IllegalArgumentException("Topic must be non-null.");
|
||||
List<List<ConsumerRecord<K, V>>> recs = new ArrayList<List<ConsumerRecord<K, V>>>();
|
||||
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
|
||||
if (entry.getKey().equals(topic))
|
||||
if (entry.getKey().topic().equals(topic))
|
||||
recs.add(entry.getValue());
|
||||
}
|
||||
return new ConcatenatedIterable<K, V>(recs);
|
||||
|
|
|
@ -242,7 +242,7 @@ public class ProducerConfig extends AbstractConfig {
|
|||
return newProperties;
|
||||
}
|
||||
|
||||
ProducerConfig(Map<? extends Object, ? extends Object> props) {
|
||||
ProducerConfig(Map<?, ?> props) {
|
||||
super(CONFIG, props);
|
||||
}
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ public class Histogram {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder b = new StringBuilder('{');
|
||||
StringBuilder b = new StringBuilder("{");
|
||||
for (int i = 0; i < this.hist.length - 1; i++) {
|
||||
b.append(String.format("%.10f", binScheme.fromBin(i)));
|
||||
b.append(':');
|
||||
|
|
|
@ -49,7 +49,7 @@ public enum SecurityProtocol {
|
|||
}
|
||||
|
||||
public static String getName(int id) {
|
||||
return CODE_TO_SECURITY_PROTOCOL.get(id).name;
|
||||
return CODE_TO_SECURITY_PROTOCOL.get((short) id).name;
|
||||
}
|
||||
|
||||
public static List<String> getNames() {
|
||||
|
|
|
@ -12,16 +12,16 @@
|
|||
*/
|
||||
package org.apache.kafka.common.config;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
import org.apache.kafka.common.metrics.MetricsReporter;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class AbstractConfigTest {
|
||||
|
||||
@Test
|
||||
|
@ -73,7 +73,7 @@ public class AbstractConfigTest {
|
|||
METRIC_REPORTER_CLASSES_DOC);
|
||||
}
|
||||
|
||||
public TestConfig(Map<? extends Object, ? extends Object> props) {
|
||||
public TestConfig(Map<?, ?> props) {
|
||||
super(CONFIG, props);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,8 +43,8 @@ public class ProtocolSerializationTest {
|
|||
new Field("struct", new Schema(new Field("field", Type.INT32))));
|
||||
this.struct = new Struct(this.schema).set("int8", (byte) 1)
|
||||
.set("int16", (short) 1)
|
||||
.set("int32", (int) 1)
|
||||
.set("int64", (long) 1)
|
||||
.set("int32", 1)
|
||||
.set("int64", 1L)
|
||||
.set("string", "1")
|
||||
.set("bytes", "1".getBytes())
|
||||
.set("array", new Object[] {1});
|
||||
|
|
|
@ -159,7 +159,7 @@ public class KafkaETLContext {
|
|||
_response = _consumer.fetch(fetchRequest);
|
||||
if(_response != null) {
|
||||
_respIterator = new ArrayList<ByteBufferMessageSet>(){{
|
||||
add((ByteBufferMessageSet) _response.messageSet(_request.getTopic(), _request.getPartition()));
|
||||
add(_response.messageSet(_request.getTopic(), _request.getPartition()));
|
||||
}}.iterator();
|
||||
}
|
||||
_requestTime += (System.currentTimeMillis() - tempTime);
|
||||
|
|
|
@ -127,7 +127,7 @@ object ReassignPartitionsCommand extends Logging {
|
|||
}
|
||||
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap)
|
||||
// before starting assignment, output the current replica assignment to facilitate rollback
|
||||
val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
|
||||
val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic))
|
||||
println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
|
||||
.format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
|
||||
// start the reassignment
|
||||
|
|
|
@ -143,7 +143,7 @@ object TopicCommand {
|
|||
topics.foreach { topic =>
|
||||
try {
|
||||
if (Topic.InternalTopics.contains(topic)) {
|
||||
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic));
|
||||
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
|
||||
} else {
|
||||
ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
|
||||
println("Topic %s is marked for deletion.".format(topic))
|
||||
|
|
|
@ -37,9 +37,9 @@ object ControlledShutdownRequest extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
case class ControlledShutdownRequest(val versionId: Short,
|
||||
val correlationId: Int,
|
||||
val brokerId: Int)
|
||||
case class ControlledShutdownRequest(versionId: Short,
|
||||
correlationId: Int,
|
||||
brokerId: Int)
|
||||
extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){
|
||||
|
||||
def this(correlationId: Int, brokerId: Int) =
|
||||
|
@ -74,4 +74,4 @@ case class ControlledShutdownRequest(val versionId: Short,
|
|||
controlledShutdownRequest.append("; BrokerId: " + brokerId)
|
||||
controlledShutdownRequest.toString()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,9 +39,9 @@ object ControlledShutdownResponse {
|
|||
}
|
||||
|
||||
|
||||
case class ControlledShutdownResponse(val correlationId: Int,
|
||||
val errorCode: Short = ErrorMapping.NoError,
|
||||
val partitionsRemaining: Set[TopicAndPartition])
|
||||
case class ControlledShutdownResponse(correlationId: Int,
|
||||
errorCode: Short = ErrorMapping.NoError,
|
||||
partitionsRemaining: Set[TopicAndPartition])
|
||||
extends RequestOrResponse() {
|
||||
def sizeInBytes(): Int ={
|
||||
var size =
|
||||
|
@ -68,4 +68,4 @@ case class ControlledShutdownResponse(val correlationId: Int,
|
|||
|
||||
override def describe(details: Boolean):String = { toString }
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
|
|||
2 /* version id */ +
|
||||
4 /* correlation id */ +
|
||||
(2 + clientId.length) /* client id */ +
|
||||
body.sizeOf();
|
||||
body.sizeOf()
|
||||
}
|
||||
|
||||
override def toString(): String = {
|
||||
|
@ -52,4 +52,4 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
|
|||
strBuffer.append("; Body: " + body.toString)
|
||||
strBuffer.toString()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,
|
|||
|
||||
def sizeInBytes(): Int = {
|
||||
4 /* correlation id */ +
|
||||
body.sizeOf();
|
||||
body.sizeOf()
|
||||
}
|
||||
|
||||
override def toString(): String = {
|
||||
|
@ -43,4 +43,4 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,
|
|||
strBuffer.append("; Body: " + body.toString)
|
||||
strBuffer.toString()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,8 +59,8 @@ object PartitionStateInfo {
|
|||
}
|
||||
}
|
||||
|
||||
case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
|
||||
val allReplicas: Set[Int]) {
|
||||
case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
|
||||
allReplicas: Set[Int]) {
|
||||
def replicationFactor = allReplicas.size
|
||||
|
||||
def writeTo(buffer: ByteBuffer) {
|
||||
|
@ -200,4 +200,4 @@ case class LeaderAndIsrRequest (versionId: Short,
|
|||
leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
|
||||
leaderAndIsrRequest.toString()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,9 +42,9 @@ object StopReplicaResponse {
|
|||
}
|
||||
|
||||
|
||||
case class StopReplicaResponse(val correlationId: Int,
|
||||
val responseMap: Map[TopicAndPartition, Short],
|
||||
val errorCode: Short = ErrorMapping.NoError)
|
||||
case class StopReplicaResponse(correlationId: Int,
|
||||
responseMap: Map[TopicAndPartition, Short],
|
||||
errorCode: Short = ErrorMapping.NoError)
|
||||
extends RequestOrResponse() {
|
||||
def sizeInBytes(): Int ={
|
||||
var size =
|
||||
|
@ -72,4 +72,4 @@ case class StopReplicaResponse(val correlationId: Int,
|
|||
}
|
||||
|
||||
override def describe(details: Boolean):String = { toString }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ object PartitionMetadata {
|
|||
}
|
||||
|
||||
case class PartitionMetadata(partitionId: Int,
|
||||
val leader: Option[BrokerEndPoint],
|
||||
leader: Option[BrokerEndPoint],
|
||||
replicas: Seq[BrokerEndPoint],
|
||||
isr: Seq[BrokerEndPoint] = Seq.empty,
|
||||
errorCode: Short = ErrorMapping.NoError) extends Logging {
|
||||
|
|
|
@ -46,10 +46,10 @@ object TopicMetadataRequest extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
case class TopicMetadataRequest(val versionId: Short,
|
||||
val correlationId: Int,
|
||||
val clientId: String,
|
||||
val topics: Seq[String])
|
||||
case class TopicMetadataRequest(versionId: Short,
|
||||
correlationId: Int,
|
||||
clientId: String,
|
||||
topics: Seq[String])
|
||||
extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
|
||||
|
||||
def this(topics: Seq[String], correlationId: Int) =
|
||||
|
@ -93,4 +93,4 @@ case class TopicMetadataRequest(val versionId: Short,
|
|||
topicMetadataRequest.append("; Topics: " + topics.mkString(","))
|
||||
topicMetadataRequest.toString()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,7 +74,7 @@ object ClientUtils extends Logging{
|
|||
} else {
|
||||
debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
|
||||
}
|
||||
return topicMetadataResponse
|
||||
topicMetadataResponse
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -68,7 +68,7 @@ case class EndPoint(host: String, port: Int, protocolType: SecurityProtocol) {
|
|||
def writeTo(buffer: ByteBuffer): Unit = {
|
||||
buffer.putInt(port)
|
||||
writeShortString(buffer, host)
|
||||
buffer.putShort(protocolType.id.toShort)
|
||||
buffer.putShort(protocolType.id)
|
||||
}
|
||||
|
||||
def sizeInBytes: Int =
|
||||
|
|
|
@ -1,26 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.common
|
||||
|
||||
/**
|
||||
* Thrown when a request is made for broker but no brokers with that topic
|
||||
* exist.
|
||||
*/
|
||||
class ConsumerRebalanceFailedException(message: String) extends RuntimeException(message) {
|
||||
def this() = this(null)
|
||||
}
|
|
@ -309,8 +309,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
|
|||
}
|
||||
updateMetadataRequestMap.clear()
|
||||
stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
|
||||
val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
|
||||
val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet
|
||||
val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
|
||||
val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
|
||||
debug("The stop replica request (delete = true) sent to broker %d is %s"
|
||||
.format(broker, stopReplicaWithDelete.mkString(",")))
|
||||
debug("The stop replica request (delete = false) sent to broker %d is %s"
|
||||
|
|
|
@ -647,7 +647,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
|
|||
*/
|
||||
def startup() = {
|
||||
inLock(controllerContext.controllerLock) {
|
||||
info("Controller starting up");
|
||||
info("Controller starting up")
|
||||
registerSessionExpirationListener()
|
||||
isRunning = true
|
||||
controllerElector.startup
|
||||
|
@ -1326,7 +1326,7 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
|
|||
}
|
||||
}
|
||||
|
||||
case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
|
||||
case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
|
||||
override def toString(): String = {
|
||||
val leaderAndIsrInfo = new StringBuilder
|
||||
leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
|
||||
|
|
|
@ -282,7 +282,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
|
|||
val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
|
||||
val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
|
||||
debug("Are all replicas for topic %s deleted %s".format(topic, replicaStatesForTopic))
|
||||
replicaStatesForTopic.foldLeft(true)((deletionState, r) => deletionState && r._2 == ReplicaDeletionSuccessful)
|
||||
replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful)
|
||||
}
|
||||
|
||||
def isAtLeastOneReplicaInDeletionStartedState(topic: String): Boolean = {
|
||||
|
|
|
@ -41,8 +41,7 @@ class DelayedRebalance(sessionTimeout: Long,
|
|||
|
||||
/* check if all known consumers have requested to re-join group */
|
||||
override def tryComplete(): Boolean = {
|
||||
allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft
|
||||
(true) ((agg, cur) => agg && cur.joinGroupReceived.get()))
|
||||
allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.forall(_.joinGroupReceived.get()))
|
||||
|
||||
if (allConsumersJoinedGroup.get())
|
||||
forceComplete()
|
||||
|
|
|
@ -47,9 +47,4 @@ private[javaapi] object Implicits extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
// used explicitly by ByteBufferMessageSet constructor as due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors
|
||||
implicit def javaListToScalaBuffer[A](l: java.util.List[A]) = {
|
||||
import scala.collection.JavaConversions._
|
||||
l: collection.mutable.Buffer[A]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,14 +19,14 @@ package kafka.javaapi.message
|
|||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.nio.ByteBuffer
|
||||
import kafka.message._
|
||||
import kafka.javaapi.Implicits.javaListToScalaBuffer
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {
|
||||
private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer)
|
||||
|
||||
def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
|
||||
// due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors and must be used explicitly
|
||||
this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), javaListToScalaBuffer(messages).toSeq : _*).buffer)
|
||||
this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), messages.asScala: _*).buffer)
|
||||
}
|
||||
|
||||
def this(messages: java.util.List[Message]) {
|
||||
|
|
|
@ -29,13 +29,13 @@ package kafka.log
|
|||
* @param enableCleaner Allows completely disabling the log cleaner
|
||||
* @param hashAlgorithm The hash algorithm to use in key comparison.
|
||||
*/
|
||||
case class CleanerConfig(val numThreads: Int = 1,
|
||||
val dedupeBufferSize: Long = 4*1024*1024L,
|
||||
val dedupeBufferLoadFactor: Double = 0.9d,
|
||||
val ioBufferSize: Int = 1024*1024,
|
||||
val maxMessageSize: Int = 32*1024*1024,
|
||||
val maxIoBytesPerSecond: Double = Double.MaxValue,
|
||||
val backOffMs: Long = 15 * 1000,
|
||||
val enableCleaner: Boolean = true,
|
||||
val hashAlgorithm: String = "MD5") {
|
||||
}
|
||||
case class CleanerConfig(numThreads: Int = 1,
|
||||
dedupeBufferSize: Long = 4*1024*1024L,
|
||||
dedupeBufferLoadFactor: Double = 0.9d,
|
||||
ioBufferSize: Int = 1024*1024,
|
||||
maxMessageSize: Int = 32*1024*1024,
|
||||
maxIoBytesPerSecond: Double = Double.MaxValue,
|
||||
backOffMs: Long = 15 * 1000,
|
||||
enableCleaner: Boolean = true,
|
||||
hashAlgorithm: String = "MD5") {
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ class LogCleaner(val config: CleanerConfig,
|
|||
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
|
||||
|
||||
/* for managing the state of partitions being cleaned. package-private to allow access in tests */
|
||||
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs);
|
||||
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
|
||||
|
||||
/* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
|
||||
private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
|
||||
|
@ -622,4 +622,4 @@ private case class LogToClean(topicPartition: TopicAndPartition, log: Log, first
|
|||
val cleanableRatio = dirtyBytes / totalBytes.toDouble
|
||||
def totalBytes = cleanBytes + dirtyBytes
|
||||
override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,23 +63,23 @@ object Defaults {
|
|||
* @param compressionType compressionType for a given topic
|
||||
*
|
||||
*/
|
||||
case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
|
||||
val segmentMs: Long = Defaults.SegmentMs,
|
||||
val segmentJitterMs: Long = Defaults.SegmentJitterMs,
|
||||
val flushInterval: Long = Defaults.FlushInterval,
|
||||
val flushMs: Long = Defaults.FlushMs,
|
||||
val retentionSize: Long = Defaults.RetentionSize,
|
||||
val retentionMs: Long = Defaults.RetentionMs,
|
||||
val maxMessageSize: Int = Defaults.MaxMessageSize,
|
||||
val maxIndexSize: Int = Defaults.MaxIndexSize,
|
||||
val indexInterval: Int = Defaults.IndexInterval,
|
||||
val fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs,
|
||||
val deleteRetentionMs: Long = Defaults.DeleteRetentionMs,
|
||||
val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
|
||||
val compact: Boolean = Defaults.Compact,
|
||||
val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
|
||||
val minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
|
||||
val compressionType: String = Defaults.CompressionType) {
|
||||
case class LogConfig(segmentSize: Int = Defaults.SegmentSize,
|
||||
segmentMs: Long = Defaults.SegmentMs,
|
||||
segmentJitterMs: Long = Defaults.SegmentJitterMs,
|
||||
flushInterval: Long = Defaults.FlushInterval,
|
||||
flushMs: Long = Defaults.FlushMs,
|
||||
retentionSize: Long = Defaults.RetentionSize,
|
||||
retentionMs: Long = Defaults.RetentionMs,
|
||||
maxMessageSize: Int = Defaults.MaxMessageSize,
|
||||
maxIndexSize: Int = Defaults.MaxIndexSize,
|
||||
indexInterval: Int = Defaults.IndexInterval,
|
||||
fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs,
|
||||
deleteRetentionMs: Long = Defaults.DeleteRetentionMs,
|
||||
minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
|
||||
compact: Boolean = Defaults.Compact,
|
||||
uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
|
||||
minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
|
||||
compressionType: String = Defaults.CompressionType) {
|
||||
|
||||
def toProps: Properties = {
|
||||
val props = new Properties()
|
||||
|
|
|
@ -375,10 +375,10 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
|
|||
if(Os.isWindows)
|
||||
lock.lock()
|
||||
try {
|
||||
return fun
|
||||
fun
|
||||
} finally {
|
||||
if(Os.isWindows)
|
||||
lock.unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend
|
|||
/**
|
||||
* The maximum number of entries this map can contain
|
||||
*/
|
||||
val slots: Int = (memory / bytesPerEntry).toInt
|
||||
val slots: Int = memory / bytesPerEntry
|
||||
|
||||
/**
|
||||
* Associate this offset to the given key.
|
||||
|
@ -177,4 +177,4 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend
|
|||
digest.digest(buffer, 0, hashSize)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,4 +22,4 @@ package kafka.log
|
|||
* in some log file of the beginning of the message set entry with the
|
||||
* given offset.
|
||||
*/
|
||||
case class OffsetPosition(val offset: Long, val position: Int)
|
||||
case class OffsetPosition(offset: Long, position: Int)
|
||||
|
|
|
@ -43,7 +43,7 @@ object MessageSet {
|
|||
var size = 0
|
||||
val iter = messages.iterator
|
||||
while(iter.hasNext) {
|
||||
val message = iter.next.asInstanceOf[Message]
|
||||
val message = iter.next
|
||||
size += entrySize(message)
|
||||
}
|
||||
size
|
||||
|
|
|
@ -52,7 +52,7 @@ object KafkaMetricsReporter {
|
|||
|
||||
def startReporters (verifiableProps: VerifiableProperties) {
|
||||
ReporterStarted synchronized {
|
||||
if (ReporterStarted.get() == false) {
|
||||
if (!ReporterStarted.get()) {
|
||||
val metricsConfig = new KafkaMetricsConfig(verifiableProps)
|
||||
if(metricsConfig.reporters.size > 0) {
|
||||
metricsConfig.reporters.foreach(reporterType => {
|
||||
|
|
|
@ -87,7 +87,7 @@ class SocketServer(val brokerId: Int,
|
|||
quotas,
|
||||
connectionsMaxIdleMs,
|
||||
portToProtocol)
|
||||
Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start();
|
||||
Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -244,7 +244,7 @@ private[kafka] class Acceptor(val host: String,
|
|||
* Accept loop that checks for new connection attempts
|
||||
*/
|
||||
def run() {
|
||||
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
|
||||
startupComplete()
|
||||
var currentProcessor = 0
|
||||
while(isRunning) {
|
||||
|
@ -480,7 +480,7 @@ private[kafka] class Processor(val id: Int,
|
|||
key.attach(receive)
|
||||
}
|
||||
val read = receive.readFrom(socketChannel)
|
||||
val address = socketChannel.socket.getRemoteSocketAddress();
|
||||
val address = socketChannel.socket.getRemoteSocketAddress()
|
||||
trace(read + " bytes read from " + address)
|
||||
if(read < 0) {
|
||||
close(key)
|
||||
|
|
|
@ -21,7 +21,7 @@ package kafka.producer
|
|||
* A topic, key, and value.
|
||||
* If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
|
||||
*/
|
||||
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
|
||||
case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) {
|
||||
if(topic == null)
|
||||
throw new IllegalArgumentException("Topic cannot be null.")
|
||||
|
||||
|
@ -39,4 +39,4 @@ case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, v
|
|||
}
|
||||
|
||||
def hasKey = key != null
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
|
|||
}
|
||||
} catch {
|
||||
case e: FileNotFoundException =>
|
||||
warn("No meta.properties file under dir %s".format(file.getAbsolutePath(), e.getMessage))
|
||||
warn("No meta.properties file under dir %s".format(file.getAbsolutePath()))
|
||||
None
|
||||
case e1: Exception =>
|
||||
error("Failed to read meta.properties file under dir %s due to %s".format(file.getAbsolutePath(), e1.getMessage))
|
||||
|
|
|
@ -458,7 +458,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
|
|||
checkpoint.write(new BrokerMetadata(brokerId))
|
||||
}
|
||||
|
||||
return brokerId
|
||||
brokerId
|
||||
}
|
||||
|
||||
private def generateBrokerId: Int = {
|
||||
|
|
|
@ -26,7 +26,7 @@ object LogOffsetMetadata {
|
|||
|
||||
class OffsetOrdering extends Ordering[LogOffsetMetadata] {
|
||||
override def compare(x: LogOffsetMetadata , y: LogOffsetMetadata ): Int = {
|
||||
return x.offsetDiff(y).toInt
|
||||
x.offsetDiff(y).toInt
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -242,7 +242,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
def getReplicaOrException(topic: String, partition: Int): Replica = {
|
||||
val replicaOpt = getReplica(topic, partition)
|
||||
if(replicaOpt.isDefined)
|
||||
return replicaOpt.get
|
||||
replicaOpt.get
|
||||
else
|
||||
throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId, topic, partition))
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ object ConsoleConsumer extends Logging {
|
|||
.withRequiredArg
|
||||
.describedAs("prop")
|
||||
.ofType(classOf[String])
|
||||
val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up");
|
||||
val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up")
|
||||
val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
|
||||
"start with the earliest message present in the log rather than the latest message.")
|
||||
val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
|
||||
|
@ -209,7 +209,7 @@ object ConsoleConsumer extends Logging {
|
|||
|
||||
def checkZkPathExists(zkUrl: String, path: String): Boolean = {
|
||||
try {
|
||||
val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer);
|
||||
val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer)
|
||||
zk.exists(path)
|
||||
} catch {
|
||||
case _: Throwable => false
|
||||
|
|
|
@ -74,7 +74,7 @@ object ConsoleProducer {
|
|||
|
||||
def getOldProducerProps(config: ProducerConfig): Properties = {
|
||||
|
||||
val props = new Properties;
|
||||
val props = new Properties
|
||||
|
||||
props.putAll(config.extraProducerProps)
|
||||
|
||||
|
@ -100,7 +100,7 @@ object ConsoleProducer {
|
|||
|
||||
def getNewProducerProps(config: ProducerConfig): Properties = {
|
||||
|
||||
val props = new Properties;
|
||||
val props = new Properties
|
||||
|
||||
props.putAll(config.extraProducerProps)
|
||||
|
||||
|
|
|
@ -114,11 +114,10 @@ object ExportZkOffsets extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = {
|
||||
return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList
|
||||
}
|
||||
private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] =
|
||||
ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList
|
||||
|
||||
private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = {
|
||||
return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList
|
||||
}
|
||||
private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] =
|
||||
ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList
|
||||
|
||||
}
|
||||
|
|
|
@ -89,7 +89,7 @@ object ImportZkOffsets extends Logging {
|
|||
s = br.readLine()
|
||||
}
|
||||
|
||||
return partOffsetsMap
|
||||
partOffsetsMap
|
||||
}
|
||||
|
||||
private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = {
|
||||
|
|
|
@ -103,7 +103,7 @@ object JmxTool extends Logging {
|
|||
|
||||
// print csv header
|
||||
val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted
|
||||
if(keys.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1)
|
||||
if(keys.size == numExpectedAttributes.map(_._2).sum + 1)
|
||||
println(keys.map("\"" + _ + "\"").mkString(","))
|
||||
|
||||
while(true) {
|
||||
|
@ -113,7 +113,7 @@ object JmxTool extends Logging {
|
|||
case Some(dFormat) => dFormat.format(new Date)
|
||||
case None => System.currentTimeMillis().toString
|
||||
}
|
||||
if(attributes.keySet.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1)
|
||||
if(attributes.keySet.size == numExpectedAttributes.map(_._2).sum + 1)
|
||||
println(keys.map(attributes(_)).mkString(","))
|
||||
val sleep = max(0, interval - (System.currentTimeMillis - start))
|
||||
Thread.sleep(sleep)
|
||||
|
@ -137,4 +137,4 @@ object JmxTool extends Logging {
|
|||
attributes
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -235,7 +235,7 @@ object ProducerPerformance extends Logging {
|
|||
|
||||
val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
|
||||
debug(seqMsgString)
|
||||
return seqMsgString.getBytes()
|
||||
seqMsgString.getBytes()
|
||||
}
|
||||
|
||||
private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {
|
||||
|
|
|
@ -148,7 +148,7 @@ object SimpleConsumerShell extends Logging {
|
|||
if(replicaId == UseLeaderReplica) {
|
||||
replicaOpt = partitionMetadataOpt.get.leader
|
||||
if(!replicaOpt.isDefined) {
|
||||
System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId))
|
||||
System.err.println("Error: user specifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(topic, partitionId))
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,9 +79,7 @@ object VerifyConsumerRebalance extends Logging {
|
|||
val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics = false)
|
||||
val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keySet.toSeq)
|
||||
|
||||
partitionsPerTopicMap.foreach { partitionsForTopic =>
|
||||
val topic = partitionsForTopic._1
|
||||
val partitions = partitionsForTopic._2
|
||||
partitionsPerTopicMap.foreach { case (topic, partitions) =>
|
||||
val topicDirs = new ZKGroupTopicDirs(group, topic)
|
||||
info("Alive partitions for topic %s are %s ".format(topic, partitions.toString))
|
||||
info("Alive consumers for topic %s => %s ".format(topic, consumersPerTopicMap.get(topic)))
|
||||
|
@ -95,8 +93,8 @@ object VerifyConsumerRebalance extends Logging {
|
|||
|
||||
// for each available partition for topic, check if an owner exists
|
||||
partitions.foreach { partition =>
|
||||
// check if there is a node for [partition]
|
||||
if(!partitionsWithOwners.exists(p => p.equals(partition))) {
|
||||
// check if there is a node for [partition]
|
||||
if(!partitionsWithOwners.contains(partition.toString)) {
|
||||
error("No owner for partition [%s,%d]".format(topic, partition))
|
||||
rebalanceSucceeded = false
|
||||
}
|
||||
|
|
|
@ -102,7 +102,7 @@ object CoreUtils extends Logging {
|
|||
* Recursively delete the list of files/directories and any subfiles (if any exist)
|
||||
* @param files sequence of files to be deleted
|
||||
*/
|
||||
def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f)))
|
||||
def rm(files: Seq[String]): Unit = files.foreach(f => rm(new File(f)))
|
||||
|
||||
/**
|
||||
* Recursively delete the given file/directory and any subfiles (if any exist)
|
||||
|
@ -230,7 +230,7 @@ object CoreUtils extends Logging {
|
|||
def createObject[T<:AnyRef](className: String, args: AnyRef*): T = {
|
||||
val klass = Class.forName(className).asInstanceOf[Class[T]]
|
||||
val constructor = klass.getConstructor(args.map(_.getClass): _*)
|
||||
constructor.newInstance(args: _*).asInstanceOf[T]
|
||||
constructor.newInstance(args: _*)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -39,7 +39,7 @@ object Mx4jLoader extends Logging {
|
|||
val address = props.getString("mx4jaddress", "0.0.0.0")
|
||||
val port = props.getInt("mx4jport", 8082)
|
||||
try {
|
||||
debug("Will try to load MX4j now, if it's in the classpath");
|
||||
debug("Will try to load MX4j now, if it's in the classpath")
|
||||
|
||||
val mbs = ManagementFactory.getPlatformMBeanServer()
|
||||
val processorName = new ObjectName("Server:name=XSLTProcessor")
|
||||
|
@ -62,10 +62,10 @@ object Mx4jLoader extends Logging {
|
|||
}
|
||||
catch {
|
||||
case e: ClassNotFoundException => {
|
||||
info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
|
||||
info("Will not load MX4J, mx4j-tools.jar is not in the classpath")
|
||||
}
|
||||
case e: Throwable => {
|
||||
warn("Could not start register mbean in JMX", e);
|
||||
warn("Could not start register mbean in JMX", e)
|
||||
}
|
||||
}
|
||||
false
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.utils;
|
||||
package kafka.utils
|
||||
|
||||
import kafka.metrics.KafkaMetricsGroup
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
@ -95,4 +95,4 @@ object Throttler {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -498,7 +498,7 @@ object ZkUtils extends Logging {
|
|||
try {
|
||||
client.getChildren(path)
|
||||
} catch {
|
||||
case e: ZkNoNodeException => return Nil
|
||||
case e: ZkNoNodeException => Nil
|
||||
case e2: Throwable => throw e2
|
||||
}
|
||||
}
|
||||
|
@ -728,21 +728,19 @@ object ZkUtils extends Logging {
|
|||
def getSequenceId(client: ZkClient, path: String): Int = {
|
||||
try {
|
||||
val stat = client.writeDataReturnStat(path, "", -1)
|
||||
return stat.getVersion
|
||||
stat.getVersion
|
||||
} catch {
|
||||
case e: ZkNoNodeException => {
|
||||
createParentPath(client, BrokerSequenceIdPath)
|
||||
try {
|
||||
client.createPersistent(BrokerSequenceIdPath, "")
|
||||
return 0
|
||||
0
|
||||
} catch {
|
||||
case e: ZkNodeExistsException =>
|
||||
val stat = client.writeDataReturnStat(BrokerSequenceIdPath, "", -1)
|
||||
return stat.getVersion
|
||||
case e2: Throwable => throw e2
|
||||
stat.getVersion
|
||||
}
|
||||
}
|
||||
case e2: Throwable => throw e2
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
|
|||
def consumeWithBrokerFailures(numIters: Int) {
|
||||
val numRecords = 1000
|
||||
sendRecords(numRecords)
|
||||
this.producers.map(_.close)
|
||||
this.producers.foreach(_.close)
|
||||
|
||||
var consumed = 0
|
||||
val consumer = this.consumers(0)
|
||||
|
@ -100,7 +100,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
|
|||
def seekAndCommitWithBrokerFailures(numIters: Int) {
|
||||
val numRecords = 1000
|
||||
sendRecords(numRecords)
|
||||
this.producers.map(_.close)
|
||||
this.producers.foreach(_.close)
|
||||
|
||||
val consumer = this.consumers(0)
|
||||
consumer.subscribe(tp)
|
||||
|
@ -151,4 +151,4 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
|
|||
}
|
||||
futures.map(_.get)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
|
|||
|
||||
override def generateConfigs() = {
|
||||
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect)
|
||||
cfgs.map(_.putAll(serverConfig))
|
||||
cfgs.foreach(_.putAll(serverConfig))
|
||||
cfgs.map(KafkaConfig.fromProps)
|
||||
}
|
||||
|
||||
|
@ -70,8 +70,8 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
|
|||
}
|
||||
|
||||
override def tearDown() {
|
||||
producers.map(_.close())
|
||||
consumers.map(_.close())
|
||||
producers.foreach(_.close())
|
||||
consumers.foreach(_.close())
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
|
|
|
@ -55,8 +55,8 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
}
|
||||
|
||||
override def tearDown() {
|
||||
servers.map(server => server.shutdown())
|
||||
servers.map(server => CoreUtils.rm(server.config.logDirs))
|
||||
servers.foreach(_.shutdown())
|
||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
|
|
|
@ -301,7 +301,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
|
|||
val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
|
||||
// create the topic
|
||||
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
|
||||
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
|
||||
val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
|
||||
// broker 2 should be the leader since it was started first
|
||||
val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None).get
|
||||
// trigger preferred replica election
|
||||
|
@ -319,7 +319,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
|
|||
val partition = 1
|
||||
// create brokers
|
||||
val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
|
||||
val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
|
||||
val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
|
||||
// create the topic
|
||||
TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
|
||||
|
||||
|
@ -330,7 +330,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
|
|||
try {
|
||||
// wait for the update metadata request to trickle to the brokers
|
||||
TestUtils.waitUntilTrue(() =>
|
||||
activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
|
||||
activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
|
||||
"Topic test not created after timeout")
|
||||
assertEquals(0, partitionsRemaining.size)
|
||||
var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
|
||||
|
@ -346,11 +346,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
|
|||
leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
|
||||
assertEquals(0, leaderAfterShutdown)
|
||||
|
||||
assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
|
||||
assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
|
||||
partitionsRemaining = controller.shutdownBroker(0)
|
||||
assertEquals(1, partitionsRemaining.size)
|
||||
// leader doesn't change since all the replicas are shut down
|
||||
assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
|
||||
assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
|
||||
}
|
||||
finally {
|
||||
servers.foreach(_.shutdown())
|
||||
|
@ -397,7 +397,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
|
|||
checkConfig(2*maxMessageSize, 2 * retentionMs)
|
||||
} finally {
|
||||
server.shutdown()
|
||||
server.config.logDirs.map(CoreUtils.rm(_))
|
||||
server.config.logDirs.foreach(CoreUtils.rm(_))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
// check if all replicas but the one that is shut down has deleted the log
|
||||
TestUtils.waitUntilTrue(() =>
|
||||
servers.filter(s => s.config.brokerId != follower.config.brokerId)
|
||||
.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.")
|
||||
.forall(_.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.")
|
||||
// ensure topic deletion is halted
|
||||
TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
|
||||
"Admin path /admin/delete_topic/test path deleted even when a follower replica is down")
|
||||
|
@ -104,8 +104,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
// create the topic
|
||||
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
|
||||
// wait until replica log is created on every broker
|
||||
TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
|
||||
res && server.getLogManager().getLog(topicAndPartition).isDefined),
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
|
||||
"Replicas for topic test not created.")
|
||||
val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
|
||||
assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
|
||||
|
@ -155,7 +154,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
|
||||
// verify that new partition doesn't exist on any broker either
|
||||
TestUtils.waitUntilTrue(() =>
|
||||
servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty),
|
||||
servers.forall(_.getLogManager().getLog(newPartition).isEmpty),
|
||||
"Replica logs not for new partition [test,1] not deleted after delete topic is complete.")
|
||||
servers.foreach(_.shutdown())
|
||||
}
|
||||
|
@ -173,7 +172,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
|
||||
// verify that new partition doesn't exist on any broker either
|
||||
assertTrue("Replica logs not deleted after delete topic is complete",
|
||||
servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty))
|
||||
servers.forall(_.getLogManager().getLog(newPartition).isEmpty))
|
||||
servers.foreach(_.shutdown())
|
||||
}
|
||||
|
||||
|
@ -192,7 +191,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
|
||||
assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
|
||||
// check if all replica logs are created
|
||||
TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined),
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
|
||||
"Replicas for topic test not created.")
|
||||
servers.foreach(_.shutdown())
|
||||
}
|
||||
|
@ -207,8 +206,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
// verify delete topic path for test2 is removed from zookeeper
|
||||
TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers)
|
||||
// verify that topic test is untouched
|
||||
TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
|
||||
res && server.getLogManager().getLog(topicAndPartition).isDefined),
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
|
||||
"Replicas for topic test not created")
|
||||
// test the topic path exists
|
||||
assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
|
||||
|
@ -267,8 +265,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
// create the topic
|
||||
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
|
||||
// wait until replica log is created on every broker
|
||||
TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
|
||||
res && server.getLogManager().getLog(topicAndPartition).isDefined),
|
||||
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
|
||||
"Replicas for topic test not created")
|
||||
servers
|
||||
}
|
||||
|
|
|
@ -63,8 +63,8 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
}
|
||||
|
||||
override def tearDown() {
|
||||
servers.map(server => server.shutdown())
|
||||
servers.map(server => server.config.logDirs.map(CoreUtils.rm(_)))
|
||||
servers.foreach(_.shutdown())
|
||||
servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_)))
|
||||
super.tearDown
|
||||
}
|
||||
|
||||
|
|
|
@ -40,8 +40,8 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
}
|
||||
|
||||
override def tearDown() {
|
||||
servers.map(server => server.shutdown())
|
||||
servers.map(server => CoreUtils.rm(server.config.logDirs))
|
||||
servers.foreach(_.shutdown())
|
||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
|
|
|
@ -78,8 +78,8 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
}
|
||||
|
||||
override def tearDown() {
|
||||
servers.map(server => shutdownServer(server))
|
||||
servers.map(server => CoreUtils.rm(server.config.logDirs))
|
||||
servers.foreach(server => shutdownServer(server))
|
||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||
|
||||
// restore log levels
|
||||
kafkaApisLogger.setLevel(Level.ERROR)
|
||||
|
|
|
@ -48,7 +48,7 @@ class LogManagerTest extends JUnit3Suite {
|
|||
if(logManager != null)
|
||||
logManager.shutdown()
|
||||
CoreUtils.rm(logDir)
|
||||
logManager.logDirs.map(CoreUtils.rm(_))
|
||||
logManager.logDirs.foreach(CoreUtils.rm(_))
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
|
|
|
@ -107,7 +107,7 @@ class LogTest extends JUnitSuite {
|
|||
|
||||
time.sleep(log.config.segmentMs - maxJitter)
|
||||
log.append(set)
|
||||
assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments);
|
||||
assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments)
|
||||
time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
|
||||
log.append(set)
|
||||
assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments)
|
||||
|
@ -302,7 +302,7 @@ class LogTest extends JUnitSuite {
|
|||
assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
|
||||
assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
|
||||
currOffset,
|
||||
log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset)
|
||||
log.append(TestUtils.singleMessageSet("hello".getBytes)).firstOffset)
|
||||
|
||||
// cleanup the log
|
||||
log.delete()
|
||||
|
@ -752,7 +752,7 @@ class LogTest extends JUnitSuite {
|
|||
val topic: String = "test_topic"
|
||||
val partition:String = "143"
|
||||
val dir: File = new File(logDir + topicPartitionName(topic, partition))
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir);
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir)
|
||||
assertEquals(topic, topicAndPartition.asTuple._1)
|
||||
assertEquals(partition.toInt, topicAndPartition.asTuple._2)
|
||||
}
|
||||
|
@ -761,7 +761,7 @@ class LogTest extends JUnitSuite {
|
|||
def testParseTopicPartitionNameForEmptyName() {
|
||||
try {
|
||||
val dir: File = new File("")
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir);
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir)
|
||||
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
|
||||
} catch {
|
||||
case e: Exception => // its GOOD!
|
||||
|
@ -772,7 +772,7 @@ class LogTest extends JUnitSuite {
|
|||
def testParseTopicPartitionNameForNull() {
|
||||
try {
|
||||
val dir: File = null
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir);
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir)
|
||||
fail("KafkaException should have been thrown for dir: " + dir)
|
||||
} catch {
|
||||
case e: Exception => // its GOOD!
|
||||
|
@ -785,7 +785,7 @@ class LogTest extends JUnitSuite {
|
|||
val partition:String = "1999"
|
||||
val dir: File = new File(logDir + File.separator + topic + partition)
|
||||
try {
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir);
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir)
|
||||
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
|
||||
} catch {
|
||||
case e: Exception => // its GOOD!
|
||||
|
@ -798,7 +798,7 @@ class LogTest extends JUnitSuite {
|
|||
val partition:String = "1999"
|
||||
val dir: File = new File(logDir + topicPartitionName(topic, partition))
|
||||
try {
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir);
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir)
|
||||
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
|
||||
} catch {
|
||||
case e: Exception => // its GOOD!
|
||||
|
@ -811,7 +811,7 @@ class LogTest extends JUnitSuite {
|
|||
val partition:String = ""
|
||||
val dir: File = new File(logDir + topicPartitionName(topic, partition))
|
||||
try {
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir);
|
||||
val topicAndPartition = Log.parseTopicPartitionName(dir)
|
||||
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
|
||||
} catch {
|
||||
case e: Exception => // its GOOD!
|
||||
|
|
|
@ -49,8 +49,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
}
|
||||
|
||||
override def tearDown() {
|
||||
servers.map(server => server.shutdown())
|
||||
servers.map(server => CoreUtils.rm(server.config.logDirs))
|
||||
servers.foreach(_.shutdown())
|
||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
|
|
|
@ -65,8 +65,9 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
for (topic <- List(topic1, topic2)) {
|
||||
val topicAndPart = TopicAndPartition(topic, partition)
|
||||
val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset
|
||||
result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total &&
|
||||
(expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset) }
|
||||
result = result && expectedOffset > 0 && brokers.forall { item =>
|
||||
(expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset)
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import org.junit.{Test, After, Before}
|
|||
|
||||
class IteratorTemplateTest extends Assertions {
|
||||
|
||||
val lst = (0 until 10).toSeq
|
||||
val lst = (0 until 10)
|
||||
val iterator = new IteratorTemplate[Int]() {
|
||||
var i = 0
|
||||
override def makeNext() = {
|
||||
|
@ -54,4 +54,4 @@ class IteratorTemplateTest extends Assertions {
|
|||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -81,10 +81,10 @@ case class MockTask(val name: String, val fun: () => Unit, var nextExecution: Lo
|
|||
def periodic = period >= 0
|
||||
def compare(t: MockTask): Int = {
|
||||
if(t.nextExecution == nextExecution)
|
||||
return 0
|
||||
0
|
||||
else if (t.nextExecution < nextExecution)
|
||||
return -1
|
||||
-1
|
||||
else
|
||||
return 1
|
||||
1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -396,7 +396,7 @@ object TestUtils extends Logging {
|
|||
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString)
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
|
||||
return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
|
||||
new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -417,7 +417,7 @@ object TestUtils extends Logging {
|
|||
consumerProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
|
||||
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
|
||||
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
|
||||
return new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
|
||||
new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -457,9 +457,9 @@ object TestUtils extends Logging {
|
|||
new IteratorTemplate[Message] {
|
||||
override def makeNext(): Message = {
|
||||
if (iter.hasNext)
|
||||
return iter.next.message
|
||||
iter.next.message
|
||||
else
|
||||
return allDone()
|
||||
allDone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -579,7 +579,7 @@ object TestUtils extends Logging {
|
|||
fail("Timing out after %d ms since leader is not elected or changed for partition [%s,%d]"
|
||||
.format(timeoutMs, topic, partition))
|
||||
|
||||
return leader
|
||||
leader
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -58,7 +58,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
case exception: Throwable => fail("Failed to create persistent path")
|
||||
}
|
||||
|
||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path));
|
||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
||||
}
|
||||
|
||||
def testMakeSurePersistsPathExistsThrowsException {
|
||||
|
@ -88,7 +88,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
case exception: Throwable => fail("Failed to create persistent path")
|
||||
}
|
||||
|
||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path));
|
||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
||||
}
|
||||
|
||||
def testCreateEphemeralPathThrowsException {
|
||||
|
@ -118,7 +118,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
case exception: Throwable => fail("Failed to create ephemeral path")
|
||||
}
|
||||
|
||||
Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path));
|
||||
Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
|
||||
}
|
||||
|
||||
def testCreatePersistentSequentialThrowsException {
|
||||
|
@ -150,6 +150,6 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
case exception: Throwable => fail("Failed to create persistent path")
|
||||
}
|
||||
|
||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath));
|
||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class SimpleConsumerDemo {
|
|||
.addFetch(KafkaProperties.topic2, 0, 0L, 100)
|
||||
.build();
|
||||
FetchResponse fetchResponse = simpleConsumer.fetch(req);
|
||||
printMessages((ByteBufferMessageSet) fetchResponse.messageSet(KafkaProperties.topic2, 0));
|
||||
printMessages(fetchResponse.messageSet(KafkaProperties.topic2, 0));
|
||||
|
||||
System.out.println("Testing single multi-fetch");
|
||||
Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>();
|
||||
|
@ -85,7 +85,7 @@ public class SimpleConsumerDemo {
|
|||
String topic = entry.getKey();
|
||||
for ( Integer offset : entry.getValue()) {
|
||||
System.out.println("Response from fetch request no: " + ++fetchReq);
|
||||
printMessages((ByteBufferMessageSet) fetchResponse.messageSet(topic, offset));
|
||||
printMessages(fetchResponse.messageSet(topic, offset));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue