From a888a9d09e4fc219ab9d8536034ac043093f17df Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Thu, 9 Feb 2012 22:04:32 +0000 Subject: [PATCH] KAFKA-262 Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own; patched by Neha Narkhede; reviewed by Jun Rao git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1242552 13f79535-47bb-0310-9956-ffa450edef68 --- .../consumer/ZookeeperConsumerConnector.scala | 83 ++++--------------- .../kafka/message/CompressionUtils.scala | 2 +- .../kafka/producer/KafkaLog4jAppender.scala | 6 +- .../kafka/tools/VerifyConsumerRebalance.scala | 8 +- .../scala/other/kafka/TestKafkaAppender.scala | 2 +- .../other/kafka/TestLinearWriteSpeed.scala | 2 - .../other/kafka/TestLogPerformance.scala | 1 - .../BackwardsCompatibilityTest.scala | 3 +- .../integration/KafkaServerTestHarness.scala | 5 -- .../message/BaseMessageSetTestCases.scala | 1 - .../message/ByteBufferMessageSetTest.scala | 1 - .../kafka/javaapi/producer/ProducerTest.scala | 3 +- .../javaapi/producer/SyncProducerTest.scala | 4 +- .../test/scala/unit/kafka/log/LogTest.scala | 3 +- .../kafka/log4j/KafkaLog4jAppenderTest.scala | 3 +- .../message/BaseMessageSetTestCases.scala | 1 - .../kafka/message/FileMessageSetTest.scala | 3 - .../unit/kafka/message/MessageTest.scala | 3 - .../unit/kafka/network/SocketServerTest.scala | 4 - .../kafka/producer/AsyncProducerTest.scala | 2 +- .../unit/kafka/producer/ProducerTest.scala | 3 +- .../kafka/producer/SyncProducerTest.scala | 4 +- .../kafka/server/ServerShutdownTest.scala | 4 +- 23 files changed, 37 insertions(+), 114 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 221d2a56e9a..cce2e2d4b04 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic._ import scala.collection._ import kafka.cluster._ import kafka.utils._ +import mutable.ListBuffer import org.I0Itec.zkclient.exception.ZkNodeExistsException import java.net.InetAddress import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} @@ -91,7 +92,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val rebalanceLock = new Object private var fetcher: Option[Fetcher] = None private var zkClient: ZkClient = null - private val topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]] + private var topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]] // queues : (topic,consumerThreadId) -> queue private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false) @@ -373,8 +374,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) extends IZkChildListener { private val dirs = new ZKGroupDirs(group) - private var oldPartitionsPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() - private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { @@ -390,25 +389,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, deletePath(zkClient, znode) debug("Consumer " + consumerIdString + " releasing " + znode) } + topicRegistry.remove(topic) } } - private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]], - newPartMap: Map[String,List[String]], - oldPartMap: Map[String,List[String]], - newConsumerMap: Map[String,List[String]], - oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = { - var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]() - for ( (topic, consumerThreadIdSet) <- myTopicThreadIdsMap ) - if ( oldPartMap.get(topic) != newPartMap.get(topic) || oldConsumerMap.get(topic) != newConsumerMap.get(topic)) - relevantTopicThreadIdsMap += (topic -> consumerThreadIdSet) - relevantTopicThreadIdsMap - } - def resetState() { topicRegistry.clear - oldConsumersPerTopicMap.clear - oldPartitionsPerTopicMap.clear } def syncedRebalance() { @@ -437,11 +423,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should * clear the cache */ info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") - oldConsumersPerTopicMap.clear() - oldPartitionsPerTopicMap.clear() } - // commit offsets - commitOffsets() // stop all fetchers and clear all the queues to avoid data duplication closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2)) // release all partitions, reset state and retry @@ -457,14 +439,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic val consumersPerTopicMap = getConsumersPerTopic(zkClient, group) val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator) - val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap) - if (relevantTopicThreadIdsMap.size <= 0) { - info("Consumer %s with %s and topic partitions %s doesn't need to rebalance.". - format(consumerIdString, consumersPerTopicMap, partitionsPerTopicMap)) - debug("Partitions per topic cache " + oldPartitionsPerTopicMap) - debug("Consumers per topic cache " + oldConsumersPerTopicMap) - return true - } /** * fetchers must be stopped to avoid data duplication, since if the current @@ -472,14 +446,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * But if we don't stop the fetchers first, this consumer would continue returning data for released * partitions in parallel. So, not stopping the fetchers leads to duplicate data. */ - closeFetchers(cluster, kafkaMessageStreams, relevantTopicThreadIdsMap) + closeFetchers(cluster, kafkaMessageStreams, myTopicThreadIdsMap) releasePartitionOwnership() var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]() - for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) { - topicRegistry.remove(topic) - topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo]) + var currentTopicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]] + + for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { + currentTopicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo]) val topicDirs = new ZKGroupTopicDirs(group, topic) val curConsumers = consumersPerTopicMap.get(topic).get @@ -507,11 +482,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, for (i <- startPart until startPart + nParts) { val partition = curPartitions(i) info(consumerThreadId + " attempting to claim partition " + partition) - val ownPartition = processPartition(topicDirs, partition, topic, consumerThreadId) - if (!ownPartition) - return false - else // record the partition ownership decision - partitionOwnershipDecision += ((topic, partition) -> consumerThreadId) + addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId) + // record the partition ownership decision + partitionOwnershipDecision += ((topic, partition) -> consumerThreadId) } } } @@ -525,8 +498,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, info("Updating the cache") debug("Partitions per topic cache " + partitionsPerTopicMap) debug("Consumers per topic cache " + consumersPerTopicMap) - oldPartitionsPerTopicMap = partitionsPerTopicMap - oldConsumersPerTopicMap = consumersPerTopicMap + topicRegistry = currentTopicRegistry updateFetcher(cluster, kafkaMessageStreams) true }else @@ -579,27 +551,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String, - topic: String, consumerThreadId: String) : Boolean = { - val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition - // check if some other consumer owns this partition at this time - val currentPartitionOwner = readDataMaybeNull(zkClient, partitionOwnerPath) - if(currentPartitionOwner != null) { - if(currentPartitionOwner.equals(consumerThreadId)) { - info(partitionOwnerPath + " exists with value " + currentPartitionOwner + " during connection loss; this is ok") - addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId) - true - } - else { - info(partitionOwnerPath + " exists with value " + currentPartitionOwner) - false - } - } else { - addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId) - true - } - } - private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = { val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner => val topic = partitionOwner._1._1 @@ -620,15 +571,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, case e2 => throw e2 } } - val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1) - if(success > 0) false /* even if one of the partition ownership attempt has failed, return false */ + val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1)) + /* even if one of the partition ownership attempt has failed, return false */ + if(hasPartitionOwnershipFailed > 0) false else true } - private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partitionString: String, + private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]], + topicDirs: ZKGroupTopicDirs, partitionString: String, topic: String, consumerThreadId: String) { val partition = Partition.parse(partitionString) - val partTopicInfoMap = topicRegistry.get(topic) + val partTopicInfoMap = currentTopicRegistry.get(topic) val znode = topicDirs.consumerOffsetDir + "/" + partition.name val offsetString = readDataMaybeNull(zkClient, znode) diff --git a/core/src/main/scala/kafka/message/CompressionUtils.scala b/core/src/main/scala/kafka/message/CompressionUtils.scala index 7d11993151c..607ca7779f5 100644 --- a/core/src/main/scala/kafka/message/CompressionUtils.scala +++ b/core/src/main/scala/kafka/message/CompressionUtils.scala @@ -49,7 +49,7 @@ class GZIPCompression(inputStream: InputStream, outputStream: ByteArrayOutputStr } override def read(a: Array[Byte]): Int = { - gzipIn.read(a) + gzipIn.read(a) } } diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index d971948d799..747bbbe498d 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -18,13 +18,13 @@ package kafka.producer import async.MissingConfigException -import org.apache.log4j.spi.{LoggingEvent, ErrorCode} +import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.AppenderSkeleton import org.apache.log4j.helpers.LogLog -import kafka.utils.{Utils, Logging} +import kafka.utils.Logging import kafka.serializer.Encoder import java.util.{Properties, Date} -import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} +import kafka.message.Message import scala.collection._ class KafkaLog4jAppender extends AppenderSkeleton with Logging { diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala index e64a23481b1..2ad1a20ff2f 100644 --- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala +++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala @@ -57,9 +57,9 @@ object VerifyConsumerRebalance extends Logging { // check if the rebalancing operation succeeded. try { if(validateRebalancingOperation(zkClient, group)) - info("Rebalance operation successful !") + println("Rebalance operation successful !") else - error("Rebalance operation failed !") + println("Rebalance operation failed !") } catch { case e2: Throwable => error("Error while verifying current rebalancing operation", e2) } @@ -132,6 +132,4 @@ object VerifyConsumerRebalance extends Logging { rebalanceSucceeded } - - -} \ No newline at end of file +} diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala index dc200d04d81..8328e99e241 100644 --- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala +++ b/core/src/test/scala/other/kafka/TestKafkaAppender.scala @@ -18,7 +18,7 @@ package kafka import message.Message -import org.apache.log4j.{Logger, PropertyConfigurator} +import org.apache.log4j.PropertyConfigurator import kafka.utils.Logging import serializer.Encoder diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 79347d83825..d6fc65da179 100644 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -17,8 +17,6 @@ package kafka -import java.net.URI -import java.util.Arrays.asList import java.io._ import java.nio._ import java.nio.channels._ diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala index 2a41e2c7cd8..e46151ff8e7 100644 --- a/core/src/test/scala/other/kafka/TestLogPerformance.scala +++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala @@ -17,7 +17,6 @@ package kafka.log -import kafka.log._ import kafka.message._ import kafka.utils.{TestUtils, Utils} diff --git a/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala b/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala index aa4b9903716..9febfc8bb3e 100644 --- a/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala +++ b/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala @@ -22,10 +22,9 @@ import org.scalatest.junit.JUnit3Suite import org.apache.log4j.Logger import java.util.Properties import kafka.consumer.SimpleConsumer -import kafka.utils.{Utils, TestUtils} +import kafka.utils.TestUtils import kafka.api.{OffsetRequest, FetchRequest} import junit.framework.Assert._ -import java.io.File class BackwardsCompatibilityTest extends JUnit3Suite { diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 561f609535a..6b825f5cf1c 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -17,11 +17,6 @@ package kafka.integration -import java.util.Properties -import junit.framework.Assert._ -import kafka.producer._ -import kafka.consumer._ -import kafka.message._ import kafka.server._ import kafka.utils.{Utils, TestUtils} import org.scalatest.junit.JUnit3Suite diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala index d90a9f549a3..c48f7dc7487 100644 --- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala @@ -35,7 +35,6 @@ trait BaseMessageSetTestCases extends JUnitSuite { @Test def testWrittenEqualsRead { - import scala.collection.JavaConversions._ val messageSet = createMessageSet(messages) TestUtils.checkEquals(messages.iterator, toMessageIterator(messageSet)) } diff --git a/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala index 67abcfc1bd4..86154d946cb 100644 --- a/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala @@ -18,7 +18,6 @@ package kafka.javaapi.message import java.nio._ -import junit.framework.TestCase import junit.framework.Assert._ import org.junit.Test import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message} diff --git a/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala index 3f0fb3a238e..296bb072f93 100644 --- a/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala @@ -24,7 +24,6 @@ import kafka.zk.EmbeddedZookeeper import kafka.utils.{TestZKUtils, TestUtils} import org.junit.{After, Before, Test} import junit.framework.Assert -import collection.mutable.HashMap import org.easymock.EasyMock import kafka.utils.Utils import java.util.concurrent.ConcurrentHashMap @@ -34,7 +33,7 @@ import org.scalatest.junit.JUnitSuite import kafka.producer.{SyncProducerConfig, Partitioner, ProducerConfig, DefaultPartitioner} import kafka.producer.ProducerPool import kafka.javaapi.message.ByteBufferMessageSet -import kafka.producer.async.{AsyncProducer, AsyncProducerConfig} +import kafka.producer.async.AsyncProducer import kafka.javaapi.Implicits._ import kafka.serializer.{StringEncoder, Encoder} import kafka.javaapi.consumer.SimpleConsumer diff --git a/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala index 526d53941ab..1923d240585 100644 --- a/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala @@ -17,11 +17,11 @@ package kafka.javaapi.producer -import junit.framework.{Assert, TestCase} +import junit.framework.Assert import kafka.utils.SystemTime import kafka.utils.TestUtils import kafka.server.{KafkaServer, KafkaConfig} -import org.apache.log4j.{Logger, Level} +import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} import java.util.Properties diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 487d79f76d1..32f6f8f05b8 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -18,14 +18,13 @@ package kafka.log import java.io._ -import java.nio._ import java.util.ArrayList import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} import kafka.utils.{Utils, TestUtils, Range} import kafka.common.OffsetOutOfRangeException -import kafka.message.{NoCompressionCodec, MessageSet, ByteBufferMessageSet, Message} +import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} class LogTest extends JUnitSuite { diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index f134b3e8e19..7f67eb3d808 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -23,13 +23,12 @@ import java.util.Properties import java.io.File import kafka.consumer.SimpleConsumer import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.TestUtils import kafka.utils.{TestUtils, TestZKUtils,Utils, Logging} import kafka.zk.EmbeddedZookeeper import junit.framework.Assert._ import kafka.api.FetchRequest import kafka.serializer.Encoder -import kafka.message.{MessageSet, Message} +import kafka.message.Message import kafka.producer.async.MissingConfigException import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala index 7317fa486c8..a6dc642f54a 100644 --- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala @@ -17,7 +17,6 @@ package kafka.message -import java.util.Arrays import junit.framework.Assert._ import kafka.utils.TestUtils._ import org.scalatest.junit.JUnitSuite diff --git a/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala index 4b8ab195818..a683963c9f1 100644 --- a/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala @@ -18,12 +18,9 @@ package kafka.message import java.nio._ -import java.util.Arrays -import junit.framework.TestCase import junit.framework.Assert._ import kafka.utils.TestUtils._ import org.junit.Test -import kafka.message._ class FileMessageSetTest extends BaseMessageSetTestCases { diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala index 0df25da0fe8..4e3184cf178 100644 --- a/core/src/test/scala/unit/kafka/message/MessageTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -19,9 +19,6 @@ package kafka.message import java.util._ import java.nio._ -import java.nio.channels._ -import java.io._ -import junit.framework.TestCase import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{Before, Test} diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 00fc5f534cb..cae66516ff7 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -19,13 +19,9 @@ package kafka.network; import java.net._ import java.io._ -import java.nio._ -import java.nio.channels._ import org.junit._ -import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import kafka.utils.TestUtils -import kafka.network._ import java.util.Random import org.apache.log4j._ diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index bff30f59bac..b87dc3d87ac 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -17,7 +17,7 @@ package kafka.producer -import junit.framework.{Assert, TestCase} +import junit.framework.Assert import java.util.Properties import org.easymock.EasyMock import kafka.api.ProducerRequest diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index b66a34daa25..7a6273cc527 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -17,14 +17,13 @@ package kafka.producer -import async.{AsyncProducerConfig, AsyncProducer} +import async.AsyncProducer import java.util.Properties import org.apache.log4j.{Logger, Level} import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig} import kafka.zk.EmbeddedZookeeper import org.junit.{After, Before, Test} import junit.framework.Assert -import collection.mutable.HashMap import org.easymock.EasyMock import java.util.concurrent.ConcurrentHashMap import kafka.cluster.Partition diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 29dbe7ea691..ac811944c70 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -17,11 +17,11 @@ package kafka.producer -import junit.framework.{Assert, TestCase} +import junit.framework.Assert import kafka.utils.SystemTime import kafka.utils.TestUtils import kafka.server.{KafkaServer, KafkaConfig} -import org.apache.log4j.{Logger, Level} +import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} import kafka.common.MessageSizeTooLargeException diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index f0e862b44c3..f6806762a60 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -20,13 +20,11 @@ import kafka.utils.TestUtils import java.io.File import kafka.utils.Utils import kafka.api.FetchRequest -import kafka.integration.ProducerConsumerTestHarness import kafka.producer.{SyncProducer, SyncProducerConfig} import kafka.consumer.SimpleConsumer import java.util.Properties import org.scalatest.junit.JUnitSuite -import junit.framework.{Assert, TestCase} -import org.junit.{After, Before, Test} +import org.junit.Test import junit.framework.Assert._ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}