Merge remote-tracking branch 'origin/trunk' into copycat

This commit is contained in:
Ewen Cheslack-Postava 2015-08-12 20:34:52 -07:00
commit c0e5fdcff5
87 changed files with 280 additions and 297 deletions

View File

@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.junit.Assert._ import org.junit.Assert._
import org.junit.Before
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
@ -52,6 +53,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
.map(KafkaConfig.fromProps(_, serverConfig)) .map(KafkaConfig.fromProps(_, serverConfig))
} }
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()

View File

@ -25,6 +25,7 @@ import kafka.server.KafkaConfig
import java.util.ArrayList import java.util.ArrayList
import org.junit.Assert._ import org.junit.Assert._
import org.junit.Before
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import kafka.coordinator.ConsumerCoordinator import kafka.coordinator.ConsumerCoordinator
@ -56,6 +57,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()

View File

@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.KafkaProducer
import kafka.server.{OffsetManager, KafkaConfig} import kafka.server.{OffsetManager, KafkaConfig}
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import org.junit.{After, Before}
import scala.collection.mutable.Buffer import scala.collection.mutable.Buffer
import kafka.coordinator.ConsumerCoordinator import kafka.coordinator.ConsumerCoordinator
@ -49,6 +50,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
cfgs.map(KafkaConfig.fromProps) cfgs.map(KafkaConfig.fromProps)
} }
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl) producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
@ -70,7 +72,8 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
servers, servers,
servers(0).consumerCoordinator.offsetsTopicConfigs) servers(0).consumerCoordinator.offsetsTopicConfigs)
} }
@After
override def tearDown() { override def tearDown() {
producers.foreach(_.close()) producers.foreach(_.close())
consumers.foreach(_.close()) consumers.foreach(_.close())

View File

@ -22,7 +22,7 @@ import kafka.utils.{ShutdownableThread, TestUtils}
import org.apache.kafka.clients.producer._ import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.junit.Assert._ import org.junit.Assert._
import org.junit.Test import org.junit.{After, Before, Test}
class ProducerBounceTest extends KafkaServerTestHarness { class ProducerBounceTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000 private val producerBufferSize = 30000
@ -62,6 +62,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
private val topic1 = "topic-1" private val topic1 = "topic-1"
private val topic2 = "topic-2" private val topic2 = "topic-2"
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
@ -70,6 +71,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize) producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
} }
@After
override def tearDown() { override def tearDown() {
if (producer1 != null) producer1.close if (producer1 != null) producer1.close
if (producer2 != null) producer2.close if (producer2 != null) producer2.close

View File

@ -19,7 +19,6 @@ package kafka.api.test
import java.util.{Properties, Collection, ArrayList} import java.util.{Properties, Collection, ArrayList}
import org.scalatest.junit.JUnit3Suite
import org.junit.runners.Parameterized import org.junit.runners.Parameterized
import org.junit.runner.RunWith import org.junit.runner.RunWith
import org.junit.runners.Parameterized.Parameters import org.junit.runners.Parameterized.Parameters
@ -36,7 +35,7 @@ import kafka.utils.{CoreUtils, TestUtils}
@RunWith(value = classOf[Parameterized]) @RunWith(value = classOf[Parameterized])
class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness { class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness {
private val brokerId = 0 private val brokerId = 0
private var server: KafkaServer = null private var server: KafkaServer = null

View File

@ -17,22 +17,20 @@
package kafka.api package kafka.api
import org.junit.Test import java.util.concurrent.{ExecutionException, TimeUnit, TimeoutException}
import org.junit.Assert._
import java.util.{Properties, Random} import java.util.{Properties, Random}
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
import kafka.common.Topic import kafka.common.Topic
import kafka.consumer.SimpleConsumer import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.{ShutdownableThread, TestUtils} import kafka.utils.{ShutdownableThread, TestUtils}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException}
import org.apache.kafka.clients.producer._ import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasAfterAppendException, NotEnoughReplicasException}
import org.junit.Assert._
import org.junit.{After, Before, Test}
class ProducerFailureHandlingTest extends KafkaServerTestHarness { class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000 private val producerBufferSize = 30000
@ -61,6 +59,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val topic1 = "topic-1" private val topic1 = "topic-1"
private val topic2 = "topic-2" private val topic2 = "topic-2"
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
@ -69,6 +68,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize) producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
} }
@After
override def tearDown() { override def tearDown() {
if (producer1 != null) producer1.close if (producer1 != null) producer1.close
if (producer2 != null) producer2.close if (producer2 != null) producer2.close

View File

@ -30,11 +30,9 @@ import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.SerializationException import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.serialization.ByteArraySerializer
import org.junit.Assert._ import org.junit.Assert._
import org.junit.Test import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnit3Suite
class ProducerSendTest extends KafkaServerTestHarness {
class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
val numServers = 2 val numServers = 2
val overridingProps = new Properties() val overridingProps = new Properties()
@ -49,6 +47,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
private val topic = "topic" private val topic = "topic"
private val numRecords = 100 private val numRecords = 100
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
@ -57,6 +56,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "") consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "")
} }
@After
override def tearDown() { override def tearDown() {
consumer1.close() consumer1.close()
consumer2.close() consumer2.close()

View File

@ -21,7 +21,7 @@ import java.security.Permission
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import org.junit.{After, Before, Test} import org.junit.{After, Before, Test}
import junit.framework.Assert._ import org.junit.Assert._
class KafkaTest { class KafkaTest {

View File

@ -17,17 +17,17 @@
package kafka.admin package kafka.admin
import org.junit.Assert._
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import junit.framework.Assert._
import kafka.utils.{ZkUtils, CoreUtils, TestUtils} import kafka.utils.{ZkUtils, CoreUtils, TestUtils}
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.client.ClientUtils import kafka.client.ClientUtils
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.{KafkaConfig, KafkaServer}
import org.junit.{After, Before}
class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { class AddPartitionsTest extends ZooKeeperTestHarness {
var configs: Seq[KafkaConfig] = null var configs: Seq[KafkaConfig] = null
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
var brokers: Seq[Broker] = Seq.empty[Broker] var brokers: Seq[Broker] = Seq.empty[Broker]
@ -39,6 +39,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
val topic3 = "new-topic3" val topic3 = "new-topic3"
val topic4 = "new-topic4" val topic4 = "new-topic4"
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
@ -54,6 +55,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers) createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
} }
@After
override def tearDown() { override def tearDown() {
servers.foreach(_.shutdown()) servers.foreach(_.shutdown())
servers.foreach(server => CoreUtils.rm(server.config.logDirs)) servers.foreach(server => CoreUtils.rm(server.config.logDirs))

View File

@ -18,7 +18,6 @@ package kafka.admin
import junit.framework.Assert._ import junit.framework.Assert._
import org.junit.Test import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import java.util.Properties import java.util.Properties
import kafka.utils._ import kafka.utils._
import kafka.log._ import kafka.log._
@ -30,7 +29,7 @@ import java.io.File
import TestUtils._ import TestUtils._
class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { class AdminTest extends ZooKeeperTestHarness with Logging {
@Test @Test
def testReplicaAssignment() { def testReplicaAssignment() {

View File

@ -19,15 +19,10 @@ package kafka.admin
import junit.framework.Assert._ import junit.framework.Assert._
import kafka.admin.ConfigCommand.ConfigCommandOptions import kafka.admin.ConfigCommand.ConfigCommandOptions
import org.junit.Test import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import kafka.utils.Logging import kafka.utils.Logging
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils
class ConfigCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
@Test @Test
def testArgumentParse() { def testArgumentParse() {
// Should parse correctly // Should parse correctly

View File

@ -16,7 +16,6 @@
*/ */
package kafka.admin package kafka.admin
import org.scalatest.junit.JUnit3Suite
import kafka.utils._ import kafka.utils._
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import org.junit.Test import org.junit.Test
@ -25,7 +24,7 @@ import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness { class DeleteConsumerGroupTest extends KafkaServerTestHarness {
def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps) def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
@Test @Test

View File

@ -17,7 +17,6 @@
package kafka.admin package kafka.admin
import kafka.log.Log import kafka.log.Log
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import junit.framework.Assert._ import junit.framework.Assert._
import kafka.utils.{ZkUtils, TestUtils} import kafka.utils.{ZkUtils, TestUtils}
@ -26,7 +25,7 @@ import org.junit.Test
import java.util.Properties import java.util.Properties
import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition} import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { class DeleteTopicTest extends ZooKeeperTestHarness {
@Test @Test
def testDeleteTopicWithAllAliveReplicas() { def testDeleteTopicWithAllAliveReplicas() {

View File

@ -18,16 +18,15 @@ package kafka.admin
import junit.framework.Assert._ import junit.framework.Assert._
import org.junit.Test import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import kafka.utils.Logging import kafka.utils.Logging
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.server.{ConfigType, OffsetManager, KafkaConfig} import kafka.server.ConfigType
import kafka.admin.TopicCommand.TopicCommandOptions import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils import kafka.utils.ZkUtils
import kafka.coordinator.ConsumerCoordinator import kafka.coordinator.ConsumerCoordinator
class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { class TopicCommandTest extends ZooKeeperTestHarness with Logging {
@Test @Test
def testConfigPreservationAcrossPartitionAlteration() { def testConfigPreservationAcrossPartitionAlteration() {

View File

@ -19,7 +19,7 @@ package kafka.api
import org.junit._ import org.junit._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import junit.framework.Assert._ import org.junit.Assert._
import scala.util.Random import scala.util.Random
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kafka.common.KafkaException import kafka.common.KafkaException

View File

@ -32,7 +32,7 @@ import java.nio.ByteBuffer
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit._ import org.junit._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import junit.framework.Assert._ import org.junit.Assert._
object SerializationTestUtils { object SerializationTestUtils {

View File

@ -22,11 +22,10 @@ import java.nio.ByteBuffer
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.Test import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import scala.collection.mutable import scala.collection.mutable
class BrokerEndPointTest extends JUnit3Suite with Logging { class BrokerEndPointTest extends Logging {
@Test @Test
def testSerDe() = { def testSerDe() = {

View File

@ -17,7 +17,7 @@
package kafka.common package kafka.common
import junit.framework.Assert._ import org.junit.Assert._
import collection.mutable.ArrayBuffer import collection.mutable.ArrayBuffer
import org.junit.Test import org.junit.Test
import kafka.producer.ProducerConfig import kafka.producer.ProducerConfig

View File

@ -17,7 +17,7 @@
package kafka.common package kafka.common
import junit.framework.Assert._ import org.junit.Assert._
import collection.mutable.ArrayBuffer import collection.mutable.ArrayBuffer
import org.junit.Test import org.junit.Test

View File

@ -18,22 +18,20 @@
package kafka.consumer package kafka.consumer
import java.util.Properties
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import scala.collection._ import scala.collection._
import junit.framework.Assert._ import org.junit.Assert._
import kafka.message._ import kafka.message._
import kafka.server._ import kafka.server._
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.utils._ import kafka.utils._
import org.junit.Test import org.junit.{Before, Test}
import kafka.serializer._ import kafka.serializer._
import org.scalatest.junit.JUnit3Suite
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { class ConsumerIteratorTest extends KafkaServerTestHarness {
val numNodes = 1 val numNodes = 1
@ -49,6 +47,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
def consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) def consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
topicInfos = configs.map(c => new PartitionTopicInfo(topic, topicInfos = configs.map(c => new PartitionTopicInfo(topic,

View File

@ -17,18 +17,17 @@
package kafka.consumer package kafka.consumer
import org.scalatest.junit.JUnit3Suite
import org.easymock.EasyMock import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import org.apache.zookeeper.data.Stat import org.apache.zookeeper.data.Stat
import kafka.utils.{TestUtils, Logging, ZkUtils, Json} import kafka.utils.{TestUtils, Logging, ZkUtils, Json}
import junit.framework.Assert._ import org.junit.Assert._
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
import kafka.consumer.PartitionAssignorTest.Scenario import kafka.consumer.PartitionAssignorTest.Scenario
import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
class PartitionAssignorTest extends JUnit3Suite with Logging { class PartitionAssignorTest extends Logging {
def testRoundRobinPartitionAssignor() { def testRoundRobinPartitionAssignor() {
val assignor = new RoundRobinAssignor val assignor = new RoundRobinAssignor

View File

@ -18,7 +18,7 @@
package kafka.consumer package kafka.consumer
import junit.framework.Assert._ import org.junit.Assert._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import org.junit.Test import org.junit.Test
import kafka.server.OffsetManager import kafka.server.OffsetManager

View File

@ -19,7 +19,7 @@ package kafka.consumer
import java.util.{Collections, Properties} import java.util.{Collections, Properties}
import junit.framework.Assert._ import org.junit.Assert._
import kafka.common.MessageStreamsExistException import kafka.common.MessageStreamsExistException
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.javaapi.consumer.ConsumerRebalanceListener
@ -30,11 +30,11 @@ import kafka.utils.TestUtils._
import kafka.utils._ import kafka.utils._
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite import org.junit.{After, Before}
import scala.collection._ import scala.collection._
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging {
val RebalanceBackoffMs = 5000 val RebalanceBackoffMs = 5000
var dirs : ZKGroupTopicDirs = null var dirs : ZKGroupTopicDirs = null
@ -54,11 +54,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val consumer3 = "consumer3" val consumer3 = "consumer3"
val nMessages = 2 val nMessages = 2
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
dirs = new ZKGroupTopicDirs(group, topic) dirs = new ZKGroupTopicDirs(group, topic)
} }
@After
override def tearDown() { override def tearDown() {
super.tearDown() super.tearDown()
} }

View File

@ -20,10 +20,10 @@ package kafka.coordinator
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import junit.framework.Assert._ import org.junit.Assert._
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import kafka.server.{OffsetManager, ReplicaManager, KafkaConfig} import kafka.server.{OffsetManager, KafkaConfig}
import kafka.utils.{KafkaScheduler, TestUtils} import kafka.utils.TestUtils
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.JoinGroupRequest import org.apache.kafka.common.requests.JoinGroupRequest
import org.easymock.EasyMock import org.easymock.EasyMock

View File

@ -17,7 +17,7 @@
package kafka.coordinator package kafka.coordinator
import junit.framework.Assert._ import org.junit.Assert._
import org.junit.{Before, Test} import org.junit.{Before, Test}
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite

View File

@ -20,7 +20,7 @@ package kafka.coordinator
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.{ZkUtils, TestUtils} import kafka.utils.{ZkUtils, TestUtils}
import junit.framework.Assert._ import org.junit.Assert._
import org.I0Itec.zkclient.{IZkDataListener, ZkClient} import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
import org.apache.zookeeper.data.Stat import org.apache.zookeeper.data.Stat
import org.easymock.EasyMock import org.easymock.EasyMock

View File

@ -19,7 +19,7 @@ package kafka.coordinator
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import junit.framework.Assert._ import org.junit.Assert._
import org.junit.Test import org.junit.Test
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite

View File

@ -24,12 +24,11 @@ import kafka.utils.TestUtils
import kafka.serializer._ import kafka.serializer._
import kafka.producer.{Producer, KeyedMessage} import kafka.producer.{Producer, KeyedMessage}
import org.junit.Test import org.junit.{After, Before, Test}
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite import org.junit.Assert._
import junit.framework.Assert._
class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging { class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@ -42,12 +41,14 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler]) val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
// temporarily set request handler logger to a higher level // temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL) requestHandlerLogger.setLevel(Level.FATAL)
} }
@After
override def tearDown() { override def tearDown() {
// restore set request handler logger to a higher level // restore set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.ERROR) requestHandlerLogger.setLevel(Level.ERROR)

View File

@ -19,18 +19,17 @@ package kafka.integration
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import org.junit.{After, Before}
import scala.collection._ import scala.collection._
import junit.framework.Assert._ import org.junit.Assert._
import kafka.cluster._ import kafka.cluster._
import kafka.server._ import kafka.server._
import org.scalatest.junit.JUnit3Suite
import kafka.consumer._ import kafka.consumer._
import kafka.serializer._
import kafka.producer.{KeyedMessage, Producer}
import kafka.utils.TestUtils import kafka.utils.TestUtils
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { class FetcherTest extends KafkaServerTestHarness {
val numNodes = 1 val numNodes = 1
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps) def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
@ -40,6 +39,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
var fetcher: ConsumerFetcherManager = null var fetcher: ConsumerFetcherManager = null
@Before
override def setUp() { override def setUp() {
super.setUp super.setUp
TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
@ -59,6 +59,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
fetcher.startConnections(topicInfos, cluster) fetcher.startConnections(topicInfos, cluster)
} }
@After
override def tearDown() { override def tearDown() {
fetcher.stopConnections() fetcher.stopConnections()
super.tearDown super.tearDown

View File

@ -19,17 +19,18 @@ package kafka.integration
import java.util.Arrays import java.util.Arrays
import scala.collection.mutable.Buffer import kafka.common.KafkaException
import kafka.server._ import kafka.server._
import kafka.utils.{CoreUtils, TestUtils} import kafka.utils.{CoreUtils, TestUtils}
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.common.KafkaException import org.junit.{After, Before}
import scala.collection.mutable.Buffer
/** /**
* A test harness that brings up some number of broker nodes * A test harness that brings up some number of broker nodes
*/ */
trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { trait KafkaServerTestHarness extends ZooKeeperTestHarness {
var instanceConfigs: Seq[KafkaConfig] = null var instanceConfigs: Seq[KafkaConfig] = null
var servers: Buffer[KafkaServer] = null var servers: Buffer[KafkaServer] = null
var brokerList: String = null var brokerList: String = null
@ -51,7 +52,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",") def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",")
@Before
override def setUp() { override def setUp() {
super.setUp super.setUp
if(configs.size <= 0) if(configs.size <= 0)
@ -62,6 +63,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
Arrays.fill(alive, true) Arrays.fill(alive, true)
} }
@After
override def tearDown() { override def tearDown() {
servers.foreach(_.shutdown()) servers.foreach(_.shutdown())
servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_))) servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_)))

View File

@ -21,9 +21,8 @@ import java.util.Properties
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.scalatest.junit.JUnit3Suite
class MinIsrConfigTest extends JUnit3Suite with KafkaServerTestHarness { class MinIsrConfigTest extends KafkaServerTestHarness {
val overridingProps = new Properties() val overridingProps = new Properties()
overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5") overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5")

View File

@ -18,13 +18,12 @@
package kafka.integration package kafka.integration
import java.nio.ByteBuffer import java.nio.ByteBuffer
import junit.framework.Assert._ import org.junit.Assert._
import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
import kafka.server.{KafkaRequestHandler, KafkaConfig} import kafka.server.{KafkaRequestHandler, KafkaConfig}
import kafka.producer.{KeyedMessage, Producer} import kafka.producer.{KeyedMessage, Producer}
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._ import scala.collection._
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils} import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils}
@ -34,7 +33,7 @@ import java.util.Properties
/** /**
* End to end tests of the primitive apis against a local server * End to end tests of the primitive apis against a local server
*/ */
class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness { class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHarness {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))

View File

@ -5,8 +5,8 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -18,28 +18,30 @@
package kafka.integration package kafka.integration
import kafka.consumer.SimpleConsumer import kafka.consumer.SimpleConsumer
import org.scalatest.junit.JUnit3Suite import org.junit.{After, Before}
import kafka.producer.Producer import kafka.producer.Producer
import kafka.utils.{StaticPartitioner, TestUtils} import kafka.utils.{StaticPartitioner, TestUtils}
import kafka.serializer.StringEncoder import kafka.serializer.StringEncoder
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness { trait ProducerConsumerTestHarness extends KafkaServerTestHarness {
val host = "localhost" val host = "localhost"
var producer: Producer[String, String] = null var producer: Producer[String, String] = null
var consumer: SimpleConsumer = null var consumer: SimpleConsumer = null
@Before
override def setUp() { override def setUp() {
super.setUp super.setUp
producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers), producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName, encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName) partitioner = classOf[StaticPartitioner].getName)
consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64*1024, "") consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64 * 1024, "")
} }
override def tearDown() { @After
producer.close() override def tearDown() {
consumer.close() producer.close()
super.tearDown consumer.close()
} super.tearDown
}
} }

View File

@ -17,18 +17,19 @@
package kafka.integration package kafka.integration
import org.scalatest.junit.JUnit3Suite import org.junit.{After, Before}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import junit.framework.Assert._ import org.junit.Assert._
import kafka.utils.{CoreUtils, TestUtils} import kafka.utils.{CoreUtils, TestUtils}
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.{KafkaConfig, KafkaServer}
class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { class RollingBounceTest extends ZooKeeperTestHarness {
val partitionId = 0 val partitionId = 0
var servers: Seq[KafkaServer] = null var servers: Seq[KafkaServer] = null
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
// controlled.shutdown.enable is true by default // controlled.shutdown.enable is true by default
@ -39,6 +40,7 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
servers = configs.map(c => TestUtils.createServer(KafkaConfig.fromProps(c))) servers = configs.map(c => TestUtils.createServer(KafkaConfig.fromProps(c)))
} }
@After
override def tearDown() { override def tearDown() {
servers.foreach(_.shutdown()) servers.foreach(_.shutdown())
servers.foreach(server => CoreUtils.rm(server.config.logDirs)) servers.foreach(server => CoreUtils.rm(server.config.logDirs))

View File

@ -19,9 +19,8 @@ package kafka.integration
import java.nio.ByteBuffer import java.nio.ByteBuffer
import junit.framework.Assert._
import kafka.admin.AdminUtils import kafka.admin.AdminUtils
import kafka.api.{TopicMetadataResponse, TopicMetadataRequest} import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
import kafka.client.ClientUtils import kafka.client.ClientUtils
import kafka.cluster.{Broker, BrokerEndPoint} import kafka.cluster.{Broker, BrokerEndPoint}
import kafka.common.ErrorMapping import kafka.common.ErrorMapping
@ -30,14 +29,16 @@ import kafka.utils.TestUtils
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.scalatest.junit.JUnit3Suite import org.junit.Assert._
import org.junit.{After, Before}
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { class TopicMetadataTest extends ZooKeeperTestHarness {
private var server1: KafkaServer = null private var server1: KafkaServer = null
var brokerEndPoints: Seq[BrokerEndPoint] = null var brokerEndPoints: Seq[BrokerEndPoint] = null
var adHocConfigs: Seq[KafkaConfig] = null var adHocConfigs: Seq[KafkaConfig] = null
val numConfigs: Int = 4 val numConfigs: Int = 4
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
val props = createBrokerConfigs(numConfigs, zkConnect) val props = createBrokerConfigs(numConfigs, zkConnect)
@ -47,6 +48,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
} }
@After
override def tearDown() { override def tearDown() {
server1.shutdown() server1.shutdown()
super.tearDown() super.tearDown()

View File

@ -18,24 +18,22 @@
package kafka.integration package kafka.integration
import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigException
import org.junit.{After, Before}
import scala.collection.mutable.MutableList
import scala.util.Random import scala.util.Random
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import java.util.Properties import java.util.Properties
import junit.framework.Assert._
import kafka.admin.AdminUtils import kafka.admin.AdminUtils
import kafka.common.FailedToSendMessageException import kafka.common.FailedToSendMessageException
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.producer.{KeyedMessage, Producer}
import kafka.serializer.StringDecoder import kafka.serializer.StringDecoder
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.CoreUtils import kafka.utils.CoreUtils
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.junit.Assert._
class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
val brokerId1 = 0 val brokerId1 = 0
val brokerId2 = 1 val brokerId2 = 1
@ -58,6 +56,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer]) val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer])
val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]]) val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]])
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
@ -77,6 +76,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
eventHandlerLogger.setLevel(Level.FATAL) eventHandlerLogger.setLevel(Level.FATAL)
} }
@After
override def tearDown() { override def tearDown() {
servers.foreach(server => shutdownServer(server)) servers.foreach(server => shutdownServer(server))
servers.foreach(server => CoreUtils.rm(server.config.logDirs)) servers.foreach(server => CoreUtils.rm(server.config.logDirs))

View File

@ -20,7 +20,6 @@ package kafka.javaapi.consumer
import java.util.Properties import java.util.Properties
import kafka.server._ import kafka.server._
import kafka.message._
import kafka.serializer._ import kafka.serializer._
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import kafka.producer.KeyedMessage import kafka.producer.KeyedMessage
@ -33,12 +32,11 @@ import kafka.common.MessageStreamsExistException
import scala.collection.JavaConversions import scala.collection.JavaConversions
import org.scalatest.junit.JUnit3Suite
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import junit.framework.Assert._ import org.junit.Assert._
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging { class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
val numNodes = 2 val numNodes = 2
val numParts = 2 val numParts = 2
val topic = "topic1" val topic = "topic1"

View File

@ -17,7 +17,7 @@
package kafka.javaapi.message package kafka.javaapi.message
import junit.framework.Assert._ import org.junit.Assert._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import org.junit.Test import org.junit.Test
import kafka.utils.TestUtils import kafka.utils.TestUtils

View File

@ -17,7 +17,7 @@
package kafka.javaapi.message package kafka.javaapi.message
import junit.framework.Assert._ import org.junit.Assert._
import org.junit.Test import org.junit.Test
import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message} import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}

View File

@ -17,19 +17,20 @@
package kafka.log package kafka.log
import java.util.Properties
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Test}
import java.nio._
import java.io.File import java.io.File
import scala.collection._ import java.nio._
import kafka.common._ import java.util.Properties
import kafka.utils._
import kafka.message._
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kafka.common._
import kafka.message._
import kafka.utils._
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.{After, Test}
import org.scalatest.junit.JUnitSuite
import scala.collection._
/** /**
* Unit tests for the log cleaning logic * Unit tests for the log cleaning logic

View File

@ -20,7 +20,7 @@ package kafka.log
import java.io._ import java.io._
import java.nio._ import java.nio._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import junit.framework.Assert._ import org.junit.Assert._
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.message._ import kafka.message._
import org.junit.Test import org.junit.Test

View File

@ -30,16 +30,14 @@ import org.junit._
import org.junit.runner.RunWith import org.junit.runner.RunWith
import org.junit.runners.Parameterized import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters import org.junit.runners.Parameterized.Parameters
import org.scalatest.junit.JUnit3Suite
import scala.collection._ import scala.collection._
/** /**
* This is an integration test that tests the fully integrated log cleaner * This is an integration test that tests the fully integrated log cleaner
*/ */
@RunWith(value = classOf[Parameterized]) @RunWith(value = classOf[Parameterized])
class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite { class LogCleanerIntegrationTest(compressionCodec: String) {
val time = new MockTime() val time = new MockTime()
val segmentSize = 100 val segmentSize = 100

View File

@ -21,9 +21,9 @@ import java.util.Properties
import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigException
import org.junit.{Assert, Test} import org.junit.{Assert, Test}
import org.scalatest.junit.JUnit3Suite import org.scalatest.Assertions._
class LogConfigTest extends JUnit3Suite { class LogConfigTest {
@Test @Test
def testFromPropsEmpty() { def testFromPropsEmpty() {

View File

@ -19,14 +19,14 @@ package kafka.log
import java.io._ import java.io._
import java.util.Properties import java.util.Properties
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import kafka.server.{BrokerState, OffsetCheckpoint}
import kafka.common._
import kafka.utils._
class LogManagerTest extends JUnit3Suite { import kafka.common._
import kafka.server.OffsetCheckpoint
import kafka.utils._
import org.junit.Assert._
import org.junit.{After, Before, Test}
class LogManagerTest {
val time: MockTime = new MockTime() val time: MockTime = new MockTime()
val maxRollInterval = 100 val maxRollInterval = 100
@ -41,20 +41,20 @@ class LogManagerTest extends JUnit3Suite {
val name = "kafka" val name = "kafka"
val veryLargeLogFlushInterval = 10000000L val veryLargeLogFlushInterval = 10000000L
override def setUp() { @Before
super.setUp() def setUp() {
logDir = TestUtils.tempDir() logDir = TestUtils.tempDir()
logManager = createLogManager() logManager = createLogManager()
logManager.startup logManager.startup
logDir = logManager.logDirs(0) logDir = logManager.logDirs(0)
} }
override def tearDown() { @After
def tearDown() {
if(logManager != null) if(logManager != null)
logManager.shutdown() logManager.shutdown()
CoreUtils.rm(logDir) CoreUtils.rm(logDir)
logManager.logDirs.foreach(CoreUtils.rm(_)) logManager.logDirs.foreach(CoreUtils.rm(_))
super.tearDown()
} }
/** /**

View File

@ -16,19 +16,15 @@
*/ */
package kafka.log package kafka.log
import junit.framework.Assert._ import org.junit.Assert._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import java.io.File
import java.io.RandomAccessFile
import java.util.Random
import org.junit.{Test, After} import org.junit.{Test, After}
import org.scalatest.junit.JUnit3Suite
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.message._ import kafka.message._
import kafka.utils.SystemTime import kafka.utils.SystemTime
import scala.collection._ import scala.collection._
class LogSegmentTest extends JUnit3Suite { class LogSegmentTest {
val segments = mutable.ArrayBuffer[LogSegment]() val segments = mutable.ArrayBuffer[LogSegment]()

View File

@ -20,7 +20,7 @@ package kafka.log
import java.io._ import java.io._
import java.util.Properties import java.util.Properties
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import junit.framework.Assert._ import org.junit.Assert._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test} import org.junit.{After, Before, Test}
import kafka.message._ import kafka.message._

View File

@ -18,7 +18,7 @@
package kafka.log package kafka.log
import java.io._ import java.io._
import junit.framework.Assert._ import org.junit.Assert._
import java.util.{Collections, Arrays} import java.util.{Collections, Arrays}
import org.junit._ import org.junit._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite

View File

@ -20,7 +20,7 @@ package kafka.log
import java.nio._ import java.nio._
import org.junit._ import org.junit._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import junit.framework.Assert._ import org.junit.Assert._
class OffsetMapTest extends JUnitSuite { class OffsetMapTest extends JUnitSuite {

View File

@ -18,7 +18,7 @@
package kafka.message package kafka.message
import java.io.RandomAccessFile import java.io.RandomAccessFile
import junit.framework.Assert._ import org.junit.Assert._
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.log.FileMessageSet import kafka.log.FileMessageSet
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite

View File

@ -19,7 +19,7 @@ package kafka.message
import java.nio._ import java.nio._
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import junit.framework.Assert._ import org.junit.Assert._
import org.junit.Test import org.junit.Test
import kafka.utils.TestUtils import kafka.utils.TestUtils

View File

@ -21,7 +21,7 @@ import java.io.ByteArrayOutputStream
import scala.collection._ import scala.collection._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import org.junit._ import org.junit._
import junit.framework.Assert._ import org.junit.Assert._
class MessageCompressionTest extends JUnitSuite { class MessageCompressionTest extends JUnitSuite {

View File

@ -20,7 +20,7 @@ package kafka.message
import java.nio._ import java.nio._
import java.util.HashMap import java.util.HashMap
import scala.collection._ import scala.collection._
import junit.framework.Assert._ import org.junit.Assert._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import org.junit.{Before, Test} import org.junit.{Before, Test}
import kafka.utils.TestUtils import kafka.utils.TestUtils

View File

@ -20,7 +20,7 @@ package kafka.message
import java.io.{InputStream, ByteArrayInputStream, ByteArrayOutputStream} import java.io.{InputStream, ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.Random import java.util.Random
import junit.framework.Assert._ import org.junit.Assert._
import org.junit.Test import org.junit.Test
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite

View File

@ -18,12 +18,11 @@ package kafka.metrics
*/ */
import org.junit.Test import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import junit.framework.Assert._ import org.junit.Assert._
import com.yammer.metrics.core.{MetricsRegistry, Clock} import com.yammer.metrics.core.{MetricsRegistry, Clock}
class KafkaTimerTest extends JUnit3Suite { class KafkaTimerTest {
@Test @Test
def testKafkaTimer() { def testKafkaTimer() {

View File

@ -21,11 +21,10 @@ import java.util.Properties
import com.yammer.metrics.Metrics import com.yammer.metrics.Metrics
import com.yammer.metrics.core.MetricPredicate import com.yammer.metrics.core.MetricPredicate
import org.junit.Test import org.junit.{After, Test}
import junit.framework.Assert._ import org.junit.Assert._
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import kafka.server._ import kafka.server._
import kafka.message._
import kafka.serializer._ import kafka.serializer._
import kafka.utils._ import kafka.utils._
import kafka.admin.AdminUtils import kafka.admin.AdminUtils
@ -33,9 +32,8 @@ import kafka.utils.TestUtils._
import scala.collection._ import scala.collection._
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.util.matching.Regex import scala.util.matching.Regex
import org.scalatest.junit.JUnit3Suite
class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging { class MetricsTest extends KafkaServerTestHarness with Logging {
val numNodes = 2 val numNodes = 2
val numParts = 2 val numParts = 2
val topic = "topic1" val topic = "topic1"
@ -48,6 +46,7 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
val nMessages = 2 val nMessages = 2
@After
override def tearDown() { override def tearDown() {
super.tearDown() super.tearDown()
} }

View File

@ -17,27 +17,26 @@
package kafka.network; package kafka.network;
import java.net._
import java.io._ import java.io._
import java.net._
import java.nio.ByteBuffer
import java.util.Random
import kafka.api.ProducerRequest
import kafka.cluster.EndPoint import kafka.cluster.EndPoint
import kafka.common.TopicAndPartition
import kafka.message.ByteBufferMessageSet
import kafka.producer.SyncProducerConfig
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.NetworkSend import org.apache.kafka.common.network.NetworkSend
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.utils.SystemTime import org.apache.kafka.common.utils.SystemTime
import org.junit.Assert._
import org.junit._ import org.junit._
import org.scalatest.junit.JUnitSuite
import java.util.Random
import junit.framework.Assert._
import kafka.producer.SyncProducerConfig
import kafka.api.ProducerRequest
import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
import kafka.message.ByteBufferMessageSet
import java.nio.channels.SelectionKey
import kafka.utils.TestUtils
import scala.collection.Map import scala.collection.Map
class SocketServerTest extends JUnitSuite { class SocketServerTest {
val server: SocketServer = new SocketServer(0, val server: SocketServer = new SocketServer(0,
Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT), Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT),
@ -84,11 +83,11 @@ class SocketServerTest extends JUnitSuite {
new Socket("localhost", server.boundPort(protocol)) new Socket("localhost", server.boundPort(protocol))
} }
@After @After
def cleanup() { def cleanup() {
server.shutdown() server.shutdown()
} }
@Test @Test
def simpleRequest() { def simpleRequest() {
val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
@ -175,7 +174,7 @@ class SocketServerTest extends JUnitSuite {
} }
@Test @Test
def testMaxConnectionsPerIPOverrides(): Unit = { def testMaxConnectionsPerIPOverrides() {
val overrideNum = 6 val overrideNum = 6
val overrides: Map[String, Int] = Map("localhost" -> overrideNum) val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
val overrideServer: SocketServer = new SocketServer(0, val overrideServer: SocketServer = new SocketServer(0,

View File

@ -19,36 +19,27 @@ package kafka.producer
import java.util.Properties import java.util.Properties
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import junit.framework.Assert._ import org.junit.Assert._
import org.easymock.EasyMock import org.easymock.EasyMock
import org.junit.Test import org.junit.Test
import kafka.api._ import kafka.api._
import kafka.cluster.{BrokerEndPoint, Broker} import kafka.cluster.BrokerEndPoint
import kafka.common._ import kafka.common._
import kafka.message._ import kafka.message._
import kafka.producer.async._ import kafka.producer.async._
import kafka.serializer._ import kafka.serializer._
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import org.scalatest.junit.JUnit3Suite
import scala.collection.Map import scala.collection.Map
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import kafka.utils._ import kafka.utils._
class AsyncProducerTest extends JUnit3Suite { class AsyncProducerTest {
// One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks // One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks
val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534)) val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534))
val configs = props.map(KafkaConfig.fromProps) val configs = props.map(KafkaConfig.fromProps)
val brokerList = configs.map(c => org.apache.kafka.common.utils.Utils.formatAddress(c.hostName, c.port)).mkString(",") val brokerList = configs.map(c => org.apache.kafka.common.utils.Utils.formatAddress(c.hostName, c.port)).mkString(",")
override def setUp() {
super.setUp()
}
override def tearDown() {
super.tearDown()
}
@Test @Test
def testProducerQueueSize() { def testProducerQueueSize() {
// a mock event handler that blocks // a mock event handler that blocks

View File

@ -17,26 +17,24 @@
package kafka.producer package kafka.producer
import org.scalatest.TestFailedException import java.util
import org.scalatest.junit.JUnit3Suite import java.util.Properties
import kafka.admin.AdminUtils
import kafka.api.FetchRequestBuilder
import kafka.common.{ErrorMapping, FailedToSendMessageException}
import kafka.consumer.SimpleConsumer import kafka.consumer.SimpleConsumer
import kafka.message.Message import kafka.message.Message
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer} import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
import kafka.utils._
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.junit.Test import org.junit.Assert._
import kafka.utils._ import org.junit.{After, Before, Test}
import java.util import org.scalatest.exceptions.TestFailedException
import kafka.admin.AdminUtils
import util.Properties
import kafka.api.FetchRequestBuilder
import org.junit.Assert.assertTrue
import org.junit.Assert.assertFalse
import org.junit.Assert.assertEquals
import kafka.common.{ErrorMapping, FailedToSendMessageException}
import kafka.serializer.StringEncoder
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ class ProducerTest extends ZooKeeperTestHarness with Logging{
private val brokerId1 = 0 private val brokerId1 = 0
private val brokerId2 = 1 private val brokerId2 = 1
private var server1: KafkaServer = null private var server1: KafkaServer = null
@ -60,6 +58,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
consumer2 consumer2
} }
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
// set up 2 brokers with 4 partitions each // set up 2 brokers with 4 partitions each
@ -81,6 +80,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
requestHandlerLogger.setLevel(Level.FATAL) requestHandlerLogger.setLevel(Level.FATAL)
} }
@After
override def tearDown() { override def tearDown() {
// restore set request handler logger to a higher level // restore set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.ERROR) requestHandlerLogger.setLevel(Level.ERROR)

View File

@ -20,7 +20,7 @@ package kafka.producer
import java.net.SocketTimeoutException import java.net.SocketTimeoutException
import java.util.Properties import java.util.Properties
import junit.framework.Assert import org.junit.Assert
import kafka.admin.AdminUtils import kafka.admin.AdminUtils
import kafka.api.ProducerResponseStatus import kafka.api.ProducerResponseStatus
import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.common.{ErrorMapping, TopicAndPartition}
@ -30,9 +30,8 @@ import kafka.server.KafkaConfig
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.Test import org.junit.Test
import org.scalatest.junit.JUnit3Suite
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { class SyncProducerTest extends KafkaServerTestHarness {
private val messageBytes = new Array[Byte](2) private val messageBytes = new Array[Byte](2)
// turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool. // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head)) def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))

View File

@ -17,18 +17,19 @@
package kafka.server package kafka.server
import junit.framework.Assert._ import org.junit.Assert._
import kafka.utils.{TestUtils, CoreUtils, ZkUtils} import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.scalatest.junit.JUnit3Suite import org.junit.{After, Before}
class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { class AdvertiseBrokerTest extends ZooKeeperTestHarness {
var server : KafkaServer = null var server : KafkaServer = null
val brokerId = 0 val brokerId = 0
val advertisedHostName = "routable-host" val advertisedHostName = "routable-host"
val advertisedPort = 1234 val advertisedPort = 1234
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
@ -39,6 +40,7 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
server = TestUtils.createServer(KafkaConfig.fromProps(props)) server = TestUtils.createServer(KafkaConfig.fromProps(props))
} }
@After
override def tearDown() { override def tearDown() {
server.shutdown() server.shutdown()
CoreUtils.rm(server.config.logDirs) CoreUtils.rm(server.config.logDirs)

View File

@ -17,22 +17,21 @@
package kafka.server package kafka.server
import org.junit.Test import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnit3Suite import org.junit.Assert._
import junit.framework.Assert._
class DelayedOperationTest extends JUnit3Suite { class DelayedOperationTest {
var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null
override def setUp() { @Before
super.setUp() def setUp() {
purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock") purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock")
} }
override def tearDown() { @After
def tearDown() {
purgatory.shutdown() purgatory.shutdown()
super.tearDown()
} }
@Test @Test

View File

@ -26,9 +26,8 @@ import kafka.utils._
import kafka.common._ import kafka.common._
import kafka.log.LogConfig import kafka.log.LogConfig
import kafka.admin.{AdminOperationException, AdminUtils} import kafka.admin.{AdminOperationException, AdminUtils}
import org.scalatest.junit.JUnit3Suite
class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { class DynamicConfigChangeTest extends KafkaServerTestHarness {
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@Test @Test

View File

@ -19,7 +19,6 @@ package kafka.server
import kafka.log._ import kafka.log._
import java.io.File import java.io.File
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
import org.easymock.EasyMock import org.easymock.EasyMock
import org.junit._ import org.junit._
import org.junit.Assert._ import org.junit.Assert._
@ -28,7 +27,7 @@ import kafka.cluster.Replica
import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils} import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils}
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
class HighwatermarkPersistenceTest extends JUnit3Suite { class HighwatermarkPersistenceTest {
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps) val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
val topic = "foo" val topic = "foo"

View File

@ -18,7 +18,7 @@ package kafka.server
import java.util.Properties import java.util.Properties
import org.scalatest.junit.JUnit3Suite import org.junit.{Before, After}
import collection.mutable.HashMap import collection.mutable.HashMap
import collection.mutable.Map import collection.mutable.Map
import kafka.cluster.{Partition, Replica} import kafka.cluster.{Partition, Replica}
@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.message.MessageSet import kafka.message.MessageSet
class IsrExpirationTest extends JUnit3Suite { class IsrExpirationTest {
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]() var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
val replicaLagTimeMaxMs = 100L val replicaLagTimeMaxMs = 100L
@ -46,14 +46,14 @@ class IsrExpirationTest extends JUnit3Suite {
var replicaManager: ReplicaManager = null var replicaManager: ReplicaManager = null
override def setUp() { @Before
super.setUp() def setUp() {
replicaManager = new ReplicaManager(configs.head, time, null, null, null, new AtomicBoolean(false)) replicaManager = new ReplicaManager(configs.head, time, null, null, null, new AtomicBoolean(false))
} }
override def tearDown() { @After
def tearDown() {
replicaManager.shutdown(false) replicaManager.shutdown(false)
super.tearDown()
} }
/* /*

View File

@ -26,9 +26,9 @@ import kafka.utils.{TestUtils, CoreUtils}
import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.{Assert, Test} import org.junit.{Assert, Test}
import org.scalatest.junit.JUnit3Suite import org.scalatest.Assertions.intercept
class KafkaConfigTest extends JUnit3Suite { class KafkaConfigTest {
@Test @Test
def testLogRetentionTimeHoursProvided() { def testLogRetentionTimeHoursProvided() {

View File

@ -17,7 +17,7 @@
package kafka.server package kafka.server
import junit.framework.Assert._ import org.junit.Assert._
import kafka.api._ import kafka.api._
import kafka.utils.{TestUtils, ZkUtils, CoreUtils} import kafka.utils.{TestUtils, ZkUtils, CoreUtils}
import kafka.cluster.Broker import kafka.cluster.Broker
@ -26,9 +26,9 @@ import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrA
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.scalatest.junit.JUnit3Suite import org.junit.{After, Before}
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { class LeaderElectionTest extends ZooKeeperTestHarness {
val brokerId1 = 0 val brokerId1 = 0
val brokerId2 = 1 val brokerId2 = 1
@ -36,6 +36,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
var staleControllerEpochDetected = false var staleControllerEpochDetected = false
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
@ -48,6 +49,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
servers ++= List(server1, server2) servers ++= List(server1, server2)
} }
@After
override def tearDown() { override def tearDown() {
servers.foreach(_.shutdown()) servers.foreach(_.shutdown())
servers.foreach(server => CoreUtils.rm(server.config.logDirs)) servers.foreach(server => CoreUtils.rm(server.config.logDirs))

View File

@ -19,12 +19,11 @@ package kafka.server
import java.io.File import java.io.File
import kafka.utils._ import kafka.utils._
import junit.framework.Assert._ import org.junit.Assert._
import java.util.{Random, Properties} import java.util.{Random, Properties}
import kafka.consumer.SimpleConsumer import kafka.consumer.SimpleConsumer
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import kafka.admin.AdminUtils import kafka.admin.AdminUtils
import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
@ -33,7 +32,7 @@ import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { class LogOffsetTest extends ZooKeeperTestHarness {
val random = new Random() val random = new Random()
var logDir: File = null var logDir: File = null
var topicLogDir: File = null var topicLogDir: File = null

View File

@ -27,10 +27,10 @@ import kafka.serializer.StringEncoder
import java.io.File import java.io.File
import org.scalatest.junit.JUnit3Suite import org.junit.{After, Before}
import org.junit.Assert._ import org.junit.Assert._
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { class LogRecoveryTest extends ZooKeeperTestHarness {
val replicaLagTimeMaxMs = 5000L val replicaLagTimeMaxMs = 5000L
val replicaLagMaxMessages = 10L val replicaLagMaxMessages = 10L
@ -69,6 +69,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
keyEncoder = classOf[IntEncoder].getName) keyEncoder = classOf[IntEncoder].getName)
} }
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
@ -86,6 +87,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
updateProducer() updateProducer()
} }
@After
override def tearDown() { override def tearDown() {
producer.close() producer.close()
for(server <- servers) { for(server <- servers) {

View File

@ -25,7 +25,6 @@ import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.junit.{After, Before, Test} import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnit3Suite
import java.util.Properties import java.util.Properties
import java.io.File import java.io.File
@ -33,9 +32,9 @@ import java.io.File
import scala.util.Random import scala.util.Random
import scala.collection._ import scala.collection._
import junit.framework.Assert._ import org.junit.Assert._
class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { class OffsetCommitTest extends ZooKeeperTestHarness {
val random: Random = new Random() val random: Random = new Random()
val group = "test-group" val group = "test-group"
val retentionCheckInterval: Long = 100L val retentionCheckInterval: Long = 100L

View File

@ -17,7 +17,7 @@
package kafka.server package kafka.server
import org.scalatest.junit.JUnit3Suite import org.junit.{After, Before}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.producer.KeyedMessage import kafka.producer.KeyedMessage
@ -25,11 +25,12 @@ import kafka.serializer.StringEncoder
import kafka.utils.{TestUtils} import kafka.utils.{TestUtils}
import kafka.common._ import kafka.common._
class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { class ReplicaFetchTest extends ZooKeeperTestHarness {
var brokers: Seq[KafkaServer] = null var brokers: Seq[KafkaServer] = null
val topic1 = "foo" val topic1 = "foo"
val topic2 = "bar" val topic2 = "bar"
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
brokers = createBrokerConfigs(2, zkConnect, false) brokers = createBrokerConfigs(2, zkConnect, false)
@ -37,6 +38,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
.map(config => TestUtils.createServer(config)) .map(config => TestUtils.createServer(config))
} }
@After
override def tearDown() { override def tearDown() {
brokers.foreach(_.shutdown()) brokers.foreach(_.shutdown())
super.tearDown() super.tearDown()

View File

@ -27,12 +27,11 @@ import java.io.File
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.easymock.EasyMock import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
import org.junit.Test import org.junit.Test
import scala.collection.Map import scala.collection.Map
class ReplicaManagerTest extends JUnit3Suite { class ReplicaManagerTest {
val topic = "test-topic" val topic = "test-topic"
@ -84,7 +83,7 @@ class ReplicaManagerTest extends JUnit3Suite {
rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback) rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback)
rm.shutdown(false); rm.shutdown(false)
TestUtils.verifyNonDaemonThreadsStatus TestUtils.verifyNonDaemonThreadsStatus

View File

@ -20,18 +20,18 @@ import java.util.Properties
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{TestUtils, CoreUtils} import kafka.utils.{TestUtils, CoreUtils}
import org.junit.Test import org.junit.{Before, Test}
import org.scalatest.junit.JUnit3Suite import org.junit.Assert._
import junit.framework.Assert._
import java.io.File import java.io.File
class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
var props1: Properties = null var props1: Properties = null
var config1: KafkaConfig = null var config1: KafkaConfig = null
var props2: Properties = null var props2: Properties = null
var config2: KafkaConfig = null var config2: KafkaConfig = null
val brokerMetaPropsFile = "meta.properties" val brokerMetaPropsFile = "meta.properties"
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
props1 = TestUtils.createBrokerConfig(-1, zkConnect) props1 = TestUtils.createBrokerConfig(-1, zkConnect)

View File

@ -27,18 +27,18 @@ import kafka.serializer.StringEncoder
import java.io.File import java.io.File
import org.junit.Test import org.junit.{Before, Test}
import org.scalatest.junit.JUnit3Suite import org.junit.Assert._
import junit.framework.Assert._
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { class ServerShutdownTest extends ZooKeeperTestHarness {
var config: KafkaConfig = null var config: KafkaConfig = null
val host = "localhost" val host = "localhost"
val topic = "test" val topic = "test"
val sent1 = List("hello", "there") val sent1 = List("hello", "there")
val sent2 = List("more", "messages") val sent2 = List("more", "messages")
override def setUp(): Unit = { @Before
override def setUp() {
super.setUp() super.setUp()
val props = TestUtils.createBrokerConfig(0, zkConnect) val props = TestUtils.createBrokerConfig(0, zkConnect)
config = KafkaConfig.fromProps(props) config = KafkaConfig.fromProps(props)

View File

@ -17,15 +17,14 @@
package kafka.server package kafka.server
import org.scalatest.junit.JUnit3Suite
import kafka.utils.ZkUtils import kafka.utils.ZkUtils
import kafka.utils.CoreUtils import kafka.utils.CoreUtils
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import junit.framework.Assert._ import org.junit.Assert._
class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness { class ServerStartupTest extends ZooKeeperTestHarness {
def testBrokerCreatesZKChroot { def testBrokerCreatesZKChroot {
val brokerId = 0 val brokerId = 0

View File

@ -22,18 +22,17 @@ import kafka.cluster.Replica
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import kafka.log.Log import kafka.log.Log
import kafka.message.{MessageSet, ByteBufferMessageSet, Message} import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
import org.junit.{After, Before}
import scala.Some
import java.util.{Properties, Collections} import java.util.{Properties, Collections}
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import collection.JavaConversions._ import collection.JavaConversions._
import org.easymock.EasyMock import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite import org.junit.Assert._
import junit.framework.Assert._
class SimpleFetchTest extends JUnit3Suite { class SimpleFetchTest {
val replicaLagTimeMaxMs = 100L val replicaLagTimeMaxMs = 100L
val replicaFetchWaitMaxMs = 100 val replicaFetchWaitMaxMs = 100
@ -63,9 +62,8 @@ class SimpleFetchTest extends JUnit3Suite {
var replicaManager: ReplicaManager = null var replicaManager: ReplicaManager = null
override def setUp() { @Before
super.setUp() def setUp() {
// create nice mock since we don't particularly care about zkclient calls // create nice mock since we don't particularly care about zkclient calls
val zkClient = EasyMock.createNiceMock(classOf[ZkClient]) val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
EasyMock.replay(zkClient) EasyMock.replay(zkClient)
@ -117,9 +115,9 @@ class SimpleFetchTest extends JUnit3Suite {
partition.inSyncReplicas = allReplicas.toSet partition.inSyncReplicas = allReplicas.toSet
} }
override def tearDown() { @After
def tearDown() {
replicaManager.shutdown(false) replicaManager.shutdown(false)
super.tearDown()
} }
/** /**

View File

@ -19,7 +19,7 @@ package kafka.utils
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import junit.framework.Assert._ import org.junit.Assert._
import org.junit.Test import org.junit.Test
class ByteBoundedBlockingQueueTest { class ByteBoundedBlockingQueueTest {

View File

@ -17,7 +17,7 @@
package kafka.utils package kafka.utils
import junit.framework.Assert._ import org.junit.Assert._
import org.junit.Test import org.junit.Test
class CommandLineUtilsTest { class CommandLineUtilsTest {

View File

@ -16,7 +16,7 @@
*/ */
package kafka.utils package kafka.utils
import junit.framework.Assert._ import org.junit.Assert._
import org.scalatest.Assertions import org.scalatest.Assertions
import org.junit.{Test, After, Before} import org.junit.{Test, After, Before}

View File

@ -16,7 +16,7 @@
*/ */
package kafka.utils package kafka.utils
import junit.framework.Assert._ import org.junit.Assert._
import org.junit.{Test, After, Before} import org.junit.{Test, After, Before}
class JsonTest { class JsonTest {

View File

@ -22,13 +22,12 @@ import kafka.server.{ReplicaFetcherManager, KafkaConfig}
import kafka.api.LeaderAndIsr import kafka.api.LeaderAndIsr
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import org.scalatest.junit.JUnit3Suite
import org.junit.Assert._ import org.junit.Assert._
import org.junit.Test import org.junit.{Before, Test}
import org.easymock.EasyMock import org.easymock.EasyMock
class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { class ReplicationUtilsTest extends ZooKeeperTestHarness {
val topic = "my-topic-test" val topic = "my-topic-test"
val partitionId = 0 val partitionId = 0
val brokerId = 1 val brokerId = 1
@ -45,7 +44,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(1,leaderEpoch,List(1,2),0), controllerEpoch) val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(1,leaderEpoch,List(1,2),0), controllerEpoch)
@Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
ZkUtils.createPersistentPath(zkClient,topicPath,topicData) ZkUtils.createPersistentPath(zkClient,topicPath,topicData)

View File

@ -16,7 +16,7 @@
*/ */
package kafka.utils package kafka.utils
import junit.framework.Assert._ import org.junit.Assert._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import org.junit.{Test, After, Before} import org.junit.{Test, After, Before}
import kafka.utils.TestUtils.retry import kafka.utils.TestUtils.retry

View File

@ -43,8 +43,7 @@ import kafka.admin.AdminUtils
import kafka.producer.ProducerConfig import kafka.producer.ProducerConfig
import kafka.log._ import kafka.log._
import junit.framework.AssertionFailedError import org.junit.Assert._
import junit.framework.Assert._
import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.KafkaProducer
import scala.collection.Map import scala.collection.Map
@ -595,7 +594,7 @@ object TestUtils extends Logging {
block block
return return
} catch { } catch {
case e: AssertionFailedError => case e: AssertionError =>
val ellapsed = System.currentTimeMillis - startTime val ellapsed = System.currentTimeMillis - startTime
if(ellapsed > maxWaitMs) { if(ellapsed > maxWaitMs) {
throw e throw e

View File

@ -16,7 +16,7 @@
*/ */
package kafka.utils.timer package kafka.utils.timer
import junit.framework.Assert._ import org.junit.Assert._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import org.junit.{Test, After, Before} import org.junit.{Test, After, Before}

View File

@ -18,7 +18,7 @@ package kafka.utils.timer
import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit} import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit}
import junit.framework.Assert._ import org.junit.Assert._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import org.junit.{Test, After, Before} import org.junit.{Test, After, Before}

View File

@ -18,13 +18,11 @@
package kafka.zk package kafka.zk
import kafka.consumer.ConsumerConfig import kafka.consumer.ConsumerConfig
import org.I0Itec.zkclient.ZkClient
import kafka.utils.ZkUtils import kafka.utils.ZkUtils
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.junit.Assert import org.junit.Assert
import org.scalatest.junit.JUnit3Suite
class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness { class ZKEphemeralTest extends ZooKeeperTestHarness {
var zkSessionTimeoutMs = 1000 var zkSessionTimeoutMs = 1000
def testEphemeralNodeCleanup = { def testEphemeralNodeCleanup = {

View File

@ -17,13 +17,12 @@
package kafka.zk package kafka.zk
import junit.framework.Assert
import kafka.consumer.ConsumerConfig import kafka.consumer.ConsumerConfig
import kafka.utils.{ZkPath, TestUtils, ZkUtils} import kafka.utils.{ZkPath, TestUtils, ZkUtils}
import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigException
import org.scalatest.junit.JUnit3Suite import org.junit.Assert._
class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { class ZKPathTest extends ZooKeeperTestHarness {
val path: String = "/some_dir" val path: String = "/some_dir"
val zkSessionTimeoutMs = 1000 val zkSessionTimeoutMs = 1000
@ -54,7 +53,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
case exception: Throwable => fail("Failed to create persistent path") case exception: Throwable => fail("Failed to create persistent path")
} }
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)) assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
} }
def testMakeSurePersistsPathExistsThrowsException { def testMakeSurePersistsPathExistsThrowsException {
@ -82,7 +81,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
case exception: Throwable => fail("Failed to create persistent path") case exception: Throwable => fail("Failed to create persistent path")
} }
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)) assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
} }
def testCreateEphemeralPathThrowsException { def testCreateEphemeralPathThrowsException {
@ -110,7 +109,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
case exception: Throwable => fail("Failed to create ephemeral path") case exception: Throwable => fail("Failed to create ephemeral path")
} }
Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path)) assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
} }
def testCreatePersistentSequentialThrowsException { def testCreatePersistentSequentialThrowsException {
@ -140,6 +139,6 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
case exception: Throwable => fail("Failed to create persistent path") case exception: Throwable => fail("Failed to create persistent path")
} }
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath)) assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath))
} }
} }

View File

@ -17,11 +17,12 @@
package kafka.zk package kafka.zk
import org.scalatest.junit.JUnit3Suite
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZkUtils, CoreUtils} import kafka.utils.{ZkUtils, CoreUtils}
import org.junit.{After, Before}
import org.scalatest.junit.JUnitSuite
trait ZooKeeperTestHarness extends JUnit3Suite { trait ZooKeeperTestHarness extends JUnitSuite {
var zkPort: Int = -1 var zkPort: Int = -1
var zookeeper: EmbeddedZookeeper = null var zookeeper: EmbeddedZookeeper = null
var zkClient: ZkClient = null var zkClient: ZkClient = null
@ -30,17 +31,17 @@ trait ZooKeeperTestHarness extends JUnit3Suite {
def zkConnect: String = "127.0.0.1:" + zkPort def zkConnect: String = "127.0.0.1:" + zkPort
override def setUp() { @Before
super.setUp def setUp() {
zookeeper = new EmbeddedZookeeper() zookeeper = new EmbeddedZookeeper()
zkPort = zookeeper.port zkPort = zookeeper.port
zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout) zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout)
} }
override def tearDown() { @After
def tearDown() {
CoreUtils.swallow(zkClient.close()) CoreUtils.swallow(zkClient.close())
CoreUtils.swallow(zookeeper.shutdown()) CoreUtils.swallow(zookeeper.shutdown())
super.tearDown
} }
} }