Merge remote-tracking branch 'origin/trunk' into copycat

This commit is contained in:
Ewen Cheslack-Postava 2015-08-12 20:34:52 -07:00
commit c0e5fdcff5
87 changed files with 280 additions and 297 deletions

View File

@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
import org.junit.Before
import scala.collection.JavaConversions._
@ -52,6 +53,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
.map(KafkaConfig.fromProps(_, serverConfig))
}
@Before
override def setUp() {
super.setUp()

View File

@ -25,6 +25,7 @@ import kafka.server.KafkaConfig
import java.util.ArrayList
import org.junit.Assert._
import org.junit.Before
import scala.collection.JavaConverters._
import kafka.coordinator.ConsumerCoordinator
@ -56,6 +57,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@Before
override def setUp() {
super.setUp()

View File

@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import kafka.server.{OffsetManager, KafkaConfig}
import kafka.integration.KafkaServerTestHarness
import org.junit.{After, Before}
import scala.collection.mutable.Buffer
import kafka.coordinator.ConsumerCoordinator
@ -49,6 +50,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
cfgs.map(KafkaConfig.fromProps)
}
@Before
override def setUp() {
super.setUp()
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
@ -70,7 +72,8 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
servers,
servers(0).consumerCoordinator.offsetsTopicConfigs)
}
@After
override def tearDown() {
producers.foreach(_.close())
consumers.foreach(_.close())

View File

@ -22,7 +22,7 @@ import kafka.utils.{ShutdownableThread, TestUtils}
import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.junit.Assert._
import org.junit.Test
import org.junit.{After, Before, Test}
class ProducerBounceTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000
@ -62,6 +62,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
private val topic1 = "topic-1"
private val topic2 = "topic-2"
@Before
override def setUp() {
super.setUp()
@ -70,6 +71,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
}
@After
override def tearDown() {
if (producer1 != null) producer1.close
if (producer2 != null) producer2.close

View File

@ -19,7 +19,6 @@ package kafka.api.test
import java.util.{Properties, Collection, ArrayList}
import org.scalatest.junit.JUnit3Suite
import org.junit.runners.Parameterized
import org.junit.runner.RunWith
import org.junit.runners.Parameterized.Parameters
@ -36,7 +35,7 @@ import kafka.utils.{CoreUtils, TestUtils}
@RunWith(value = classOf[Parameterized])
class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness {
class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness {
private val brokerId = 0
private var server: KafkaServer = null

View File

@ -17,22 +17,20 @@
package kafka.api
import org.junit.Test
import org.junit.Assert._
import java.util.concurrent.{ExecutionException, TimeUnit, TimeoutException}
import java.util.{Properties, Random}
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
import kafka.common.Topic
import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{ShutdownableThread, TestUtils}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException}
import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasAfterAppendException, NotEnoughReplicasException}
import org.junit.Assert._
import org.junit.{After, Before, Test}
class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000
@ -61,6 +59,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val topic1 = "topic-1"
private val topic2 = "topic-2"
@Before
override def setUp() {
super.setUp()
@ -69,6 +68,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
}
@After
override def tearDown() {
if (producer1 != null) producer1.close
if (producer2 != null) producer2.close

View File

@ -30,11 +30,9 @@ import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import org.junit.{After, Before, Test}
class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
class ProducerSendTest extends KafkaServerTestHarness {
val numServers = 2
val overridingProps = new Properties()
@ -49,6 +47,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
private val topic = "topic"
private val numRecords = 100
@Before
override def setUp() {
super.setUp()
@ -57,6 +56,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "")
}
@After
override def tearDown() {
consumer1.close()
consumer2.close()

View File

@ -21,7 +21,7 @@ import java.security.Permission
import kafka.server.KafkaConfig
import org.junit.{After, Before, Test}
import junit.framework.Assert._
import org.junit.Assert._
class KafkaTest {

View File

@ -17,17 +17,17 @@
package kafka.admin
import org.junit.Assert._
import org.apache.kafka.common.protocol.SecurityProtocol
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import junit.framework.Assert._
import kafka.utils.{ZkUtils, CoreUtils, TestUtils}
import kafka.cluster.Broker
import kafka.client.ClientUtils
import kafka.server.{KafkaConfig, KafkaServer}
import org.junit.{After, Before}
class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
class AddPartitionsTest extends ZooKeeperTestHarness {
var configs: Seq[KafkaConfig] = null
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
var brokers: Seq[Broker] = Seq.empty[Broker]
@ -39,6 +39,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
val topic3 = "new-topic3"
val topic4 = "new-topic4"
@Before
override def setUp() {
super.setUp()
@ -54,6 +55,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
}
@After
override def tearDown() {
servers.foreach(_.shutdown())
servers.foreach(server => CoreUtils.rm(server.config.logDirs))

View File

@ -18,7 +18,6 @@ package kafka.admin
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import java.util.Properties
import kafka.utils._
import kafka.log._
@ -30,7 +29,7 @@ import java.io.File
import TestUtils._
class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
class AdminTest extends ZooKeeperTestHarness with Logging {
@Test
def testReplicaAssignment() {

View File

@ -19,15 +19,10 @@ package kafka.admin
import junit.framework.Assert._
import kafka.admin.ConfigCommand.ConfigCommandOptions
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import kafka.utils.Logging
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils
class ConfigCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
@Test
def testArgumentParse() {
// Should parse correctly

View File

@ -16,7 +16,6 @@
*/
package kafka.admin
import org.scalatest.junit.JUnit3Suite
import kafka.utils._
import kafka.server.KafkaConfig
import org.junit.Test
@ -25,7 +24,7 @@ import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import kafka.integration.KafkaServerTestHarness
class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness {
class DeleteConsumerGroupTest extends KafkaServerTestHarness {
def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
@Test

View File

@ -17,7 +17,6 @@
package kafka.admin
import kafka.log.Log
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import junit.framework.Assert._
import kafka.utils.{ZkUtils, TestUtils}
@ -26,7 +25,7 @@ import org.junit.Test
import java.util.Properties
import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
class DeleteTopicTest extends ZooKeeperTestHarness {
@Test
def testDeleteTopicWithAllAliveReplicas() {

View File

@ -18,16 +18,15 @@ package kafka.admin
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import kafka.utils.Logging
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
import kafka.server.ConfigType
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils
import kafka.coordinator.ConsumerCoordinator
class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
class TopicCommandTest extends ZooKeeperTestHarness with Logging {
@Test
def testConfigPreservationAcrossPartitionAlteration() {

View File

@ -19,7 +19,7 @@ package kafka.api
import org.junit._
import org.scalatest.junit.JUnitSuite
import junit.framework.Assert._
import org.junit.Assert._
import scala.util.Random
import java.nio.ByteBuffer
import kafka.common.KafkaException

View File

@ -32,7 +32,7 @@ import java.nio.ByteBuffer
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit._
import org.scalatest.junit.JUnitSuite
import junit.framework.Assert._
import org.junit.Assert._
object SerializationTestUtils {

View File

@ -22,11 +22,10 @@ import java.nio.ByteBuffer
import kafka.utils.Logging
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import scala.collection.mutable
class BrokerEndPointTest extends JUnit3Suite with Logging {
class BrokerEndPointTest extends Logging {
@Test
def testSerDe() = {

View File

@ -17,7 +17,7 @@
package kafka.common
import junit.framework.Assert._
import org.junit.Assert._
import collection.mutable.ArrayBuffer
import org.junit.Test
import kafka.producer.ProducerConfig

View File

@ -17,7 +17,7 @@
package kafka.common
import junit.framework.Assert._
import org.junit.Assert._
import collection.mutable.ArrayBuffer
import org.junit.Test

View File

@ -18,22 +18,20 @@
package kafka.consumer
import java.util.Properties
import java.util.concurrent._
import java.util.concurrent.atomic._
import scala.collection._
import junit.framework.Assert._
import org.junit.Assert._
import kafka.message._
import kafka.server._
import kafka.utils.TestUtils._
import kafka.utils._
import org.junit.Test
import org.junit.{Before, Test}
import kafka.serializer._
import org.scalatest.junit.JUnit3Suite
import kafka.integration.KafkaServerTestHarness
class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
class ConsumerIteratorTest extends KafkaServerTestHarness {
val numNodes = 1
@ -49,6 +47,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
def consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
@Before
override def setUp() {
super.setUp()
topicInfos = configs.map(c => new PartitionTopicInfo(topic,

View File

@ -17,18 +17,17 @@
package kafka.consumer
import org.scalatest.junit.JUnit3Suite
import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient
import org.apache.zookeeper.data.Stat
import kafka.utils.{TestUtils, Logging, ZkUtils, Json}
import junit.framework.Assert._
import org.junit.Assert._
import kafka.common.TopicAndPartition
import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
import kafka.consumer.PartitionAssignorTest.Scenario
import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
class PartitionAssignorTest extends JUnit3Suite with Logging {
class PartitionAssignorTest extends Logging {
def testRoundRobinPartitionAssignor() {
val assignor = new RoundRobinAssignor

View File

@ -18,7 +18,7 @@
package kafka.consumer
import junit.framework.Assert._
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import kafka.server.OffsetManager

View File

@ -19,7 +19,7 @@ package kafka.consumer
import java.util.{Collections, Properties}
import junit.framework.Assert._
import org.junit.Assert._
import kafka.common.MessageStreamsExistException
import kafka.integration.KafkaServerTestHarness
import kafka.javaapi.consumer.ConsumerRebalanceListener
@ -30,11 +30,11 @@ import kafka.utils.TestUtils._
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import org.junit.{After, Before}
import scala.collection._
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging {
val RebalanceBackoffMs = 5000
var dirs : ZKGroupTopicDirs = null
@ -54,11 +54,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val consumer3 = "consumer3"
val nMessages = 2
@Before
override def setUp() {
super.setUp()
dirs = new ZKGroupTopicDirs(group, topic)
}
@After
override def tearDown() {
super.tearDown()
}

View File

@ -20,10 +20,10 @@ package kafka.coordinator
import java.util.concurrent.TimeUnit
import junit.framework.Assert._
import org.junit.Assert._
import kafka.common.TopicAndPartition
import kafka.server.{OffsetManager, ReplicaManager, KafkaConfig}
import kafka.utils.{KafkaScheduler, TestUtils}
import kafka.server.{OffsetManager, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.JoinGroupRequest
import org.easymock.EasyMock

View File

@ -17,7 +17,7 @@
package kafka.coordinator
import junit.framework.Assert._
import org.junit.Assert._
import org.junit.{Before, Test}
import org.scalatest.junit.JUnitSuite

View File

@ -20,7 +20,7 @@ package kafka.coordinator
import kafka.server.KafkaConfig
import kafka.utils.{ZkUtils, TestUtils}
import junit.framework.Assert._
import org.junit.Assert._
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
import org.apache.zookeeper.data.Stat
import org.easymock.EasyMock

View File

@ -19,7 +19,7 @@ package kafka.coordinator
import kafka.common.TopicAndPartition
import junit.framework.Assert._
import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.JUnitSuite

View File

@ -24,12 +24,11 @@ import kafka.utils.TestUtils
import kafka.serializer._
import kafka.producer.{Producer, KeyedMessage}
import org.junit.Test
import org.junit.{After, Before, Test}
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import junit.framework.Assert._
import org.junit.Assert._
class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@ -42,12 +41,14 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
@Before
override def setUp() {
super.setUp()
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
}
@After
override def tearDown() {
// restore set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.ERROR)

View File

@ -19,18 +19,17 @@ package kafka.integration
import java.util.concurrent._
import java.util.concurrent.atomic._
import org.junit.{After, Before}
import scala.collection._
import junit.framework.Assert._
import org.junit.Assert._
import kafka.cluster._
import kafka.server._
import org.scalatest.junit.JUnit3Suite
import kafka.consumer._
import kafka.serializer._
import kafka.producer.{KeyedMessage, Producer}
import kafka.utils.TestUtils
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
class FetcherTest extends KafkaServerTestHarness {
val numNodes = 1
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
@ -40,6 +39,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
var fetcher: ConsumerFetcherManager = null
@Before
override def setUp() {
super.setUp
TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
@ -59,6 +59,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
fetcher.startConnections(topicInfos, cluster)
}
@After
override def tearDown() {
fetcher.stopConnections()
super.tearDown

View File

@ -19,17 +19,18 @@ package kafka.integration
import java.util.Arrays
import scala.collection.mutable.Buffer
import kafka.common.KafkaException
import kafka.server._
import kafka.utils.{CoreUtils, TestUtils}
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.common.KafkaException
import org.junit.{After, Before}
import scala.collection.mutable.Buffer
/**
* A test harness that brings up some number of broker nodes
*/
trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
trait KafkaServerTestHarness extends ZooKeeperTestHarness {
var instanceConfigs: Seq[KafkaConfig] = null
var servers: Buffer[KafkaServer] = null
var brokerList: String = null
@ -51,7 +52,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",")
@Before
override def setUp() {
super.setUp
if(configs.size <= 0)
@ -62,6 +63,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
Arrays.fill(alive, true)
}
@After
override def tearDown() {
servers.foreach(_.shutdown())
servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_)))

View File

@ -21,9 +21,8 @@ import java.util.Properties
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.scalatest.junit.JUnit3Suite
class MinIsrConfigTest extends JUnit3Suite with KafkaServerTestHarness {
class MinIsrConfigTest extends KafkaServerTestHarness {
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5")

View File

@ -18,13 +18,12 @@
package kafka.integration
import java.nio.ByteBuffer
import junit.framework.Assert._
import org.junit.Assert._
import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
import kafka.server.{KafkaRequestHandler, KafkaConfig}
import kafka.producer.{KeyedMessage, Producer}
import org.apache.log4j.{Level, Logger}
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils}
@ -34,7 +33,7 @@ import java.util.Properties
/**
* End to end tests of the primitive apis against a local server
*/
class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHarness {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))

View File

@ -5,8 +5,8 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -18,28 +18,30 @@
package kafka.integration
import kafka.consumer.SimpleConsumer
import org.scalatest.junit.JUnit3Suite
import org.junit.{After, Before}
import kafka.producer.Producer
import kafka.utils.{StaticPartitioner, TestUtils}
import kafka.serializer.StringEncoder
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
val host = "localhost"
var producer: Producer[String, String] = null
var consumer: SimpleConsumer = null
trait ProducerConsumerTestHarness extends KafkaServerTestHarness {
val host = "localhost"
var producer: Producer[String, String] = null
var consumer: SimpleConsumer = null
@Before
override def setUp() {
super.setUp
producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName)
consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64*1024, "")
}
super.setUp
producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[StringEncoder].getName,
partitioner = classOf[StaticPartitioner].getName)
consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64 * 1024, "")
}
override def tearDown() {
producer.close()
consumer.close()
super.tearDown
}
@After
override def tearDown() {
producer.close()
consumer.close()
super.tearDown
}
}

View File

@ -17,18 +17,19 @@
package kafka.integration
import org.scalatest.junit.JUnit3Suite
import org.junit.{After, Before}
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import junit.framework.Assert._
import org.junit.Assert._
import kafka.utils.{CoreUtils, TestUtils}
import kafka.server.{KafkaConfig, KafkaServer}
class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
class RollingBounceTest extends ZooKeeperTestHarness {
val partitionId = 0
var servers: Seq[KafkaServer] = null
@Before
override def setUp() {
super.setUp()
// controlled.shutdown.enable is true by default
@ -39,6 +40,7 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
servers = configs.map(c => TestUtils.createServer(KafkaConfig.fromProps(c)))
}
@After
override def tearDown() {
servers.foreach(_.shutdown())
servers.foreach(server => CoreUtils.rm(server.config.logDirs))

View File

@ -19,9 +19,8 @@ package kafka.integration
import java.nio.ByteBuffer
import junit.framework.Assert._
import kafka.admin.AdminUtils
import kafka.api.{TopicMetadataResponse, TopicMetadataRequest}
import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
import kafka.client.ClientUtils
import kafka.cluster.{Broker, BrokerEndPoint}
import kafka.common.ErrorMapping
@ -30,14 +29,16 @@ import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol
import org.scalatest.junit.JUnit3Suite
import org.junit.Assert._
import org.junit.{After, Before}
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
class TopicMetadataTest extends ZooKeeperTestHarness {
private var server1: KafkaServer = null
var brokerEndPoints: Seq[BrokerEndPoint] = null
var adHocConfigs: Seq[KafkaConfig] = null
val numConfigs: Int = 4
@Before
override def setUp() {
super.setUp()
val props = createBrokerConfigs(numConfigs, zkConnect)
@ -47,6 +48,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
}
@After
override def tearDown() {
server1.shutdown()
super.tearDown()

View File

@ -18,24 +18,22 @@
package kafka.integration
import org.apache.kafka.common.config.ConfigException
import org.junit.{After, Before}
import scala.collection.mutable.MutableList
import scala.util.Random
import org.apache.log4j.{Level, Logger}
import org.scalatest.junit.JUnit3Suite
import java.util.Properties
import junit.framework.Assert._
import kafka.admin.AdminUtils
import kafka.common.FailedToSendMessageException
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException}
import kafka.producer.{KeyedMessage, Producer}
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.serializer.StringDecoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.CoreUtils
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.junit.Assert._
class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
val brokerId1 = 0
val brokerId2 = 1
@ -58,6 +56,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer])
val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]])
@Before
override def setUp() {
super.setUp()
@ -77,6 +76,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
eventHandlerLogger.setLevel(Level.FATAL)
}
@After
override def tearDown() {
servers.foreach(server => shutdownServer(server))
servers.foreach(server => CoreUtils.rm(server.config.logDirs))

View File

@ -20,7 +20,6 @@ package kafka.javaapi.consumer
import java.util.Properties
import kafka.server._
import kafka.message._
import kafka.serializer._
import kafka.integration.KafkaServerTestHarness
import kafka.producer.KeyedMessage
@ -33,12 +32,11 @@ import kafka.common.MessageStreamsExistException
import scala.collection.JavaConversions
import org.scalatest.junit.JUnit3Suite
import org.apache.log4j.{Level, Logger}
import junit.framework.Assert._
import org.junit.Assert._
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
val numNodes = 2
val numParts = 2
val topic = "topic1"

View File

@ -17,7 +17,7 @@
package kafka.javaapi.message
import junit.framework.Assert._
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import kafka.utils.TestUtils

View File

@ -17,7 +17,7 @@
package kafka.javaapi.message
import junit.framework.Assert._
import org.junit.Assert._
import org.junit.Test
import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}

View File

@ -17,19 +17,20 @@
package kafka.log
import java.util.Properties
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Test}
import java.nio._
import java.io.File
import scala.collection._
import kafka.common._
import kafka.utils._
import kafka.message._
import java.nio._
import java.util.Properties
import java.util.concurrent.atomic.AtomicLong
import kafka.common._
import kafka.message._
import kafka.utils._
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.{After, Test}
import org.scalatest.junit.JUnitSuite
import scala.collection._
/**
* Unit tests for the log cleaning logic

View File

@ -20,7 +20,7 @@ package kafka.log
import java.io._
import java.nio._
import java.util.concurrent.atomic._
import junit.framework.Assert._
import org.junit.Assert._
import kafka.utils.TestUtils._
import kafka.message._
import org.junit.Test

View File

@ -30,16 +30,14 @@ import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import org.scalatest.junit.JUnit3Suite
import scala.collection._
/**
* This is an integration test that tests the fully integrated log cleaner
*/
@RunWith(value = classOf[Parameterized])
class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite {
class LogCleanerIntegrationTest(compressionCodec: String) {
val time = new MockTime()
val segmentSize = 100

View File

@ -21,9 +21,9 @@ import java.util.Properties
import org.apache.kafka.common.config.ConfigException
import org.junit.{Assert, Test}
import org.scalatest.junit.JUnit3Suite
import org.scalatest.Assertions._
class LogConfigTest extends JUnit3Suite {
class LogConfigTest {
@Test
def testFromPropsEmpty() {

View File

@ -19,14 +19,14 @@ package kafka.log
import java.io._
import java.util.Properties
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import kafka.server.{BrokerState, OffsetCheckpoint}
import kafka.common._
import kafka.utils._
class LogManagerTest extends JUnit3Suite {
import kafka.common._
import kafka.server.OffsetCheckpoint
import kafka.utils._
import org.junit.Assert._
import org.junit.{After, Before, Test}
class LogManagerTest {
val time: MockTime = new MockTime()
val maxRollInterval = 100
@ -41,20 +41,20 @@ class LogManagerTest extends JUnit3Suite {
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
override def setUp() {
super.setUp()
@Before
def setUp() {
logDir = TestUtils.tempDir()
logManager = createLogManager()
logManager.startup
logDir = logManager.logDirs(0)
}
override def tearDown() {
@After
def tearDown() {
if(logManager != null)
logManager.shutdown()
CoreUtils.rm(logDir)
logManager.logDirs.foreach(CoreUtils.rm(_))
super.tearDown()
}
/**

View File

@ -16,19 +16,15 @@
*/
package kafka.log
import junit.framework.Assert._
import org.junit.Assert._
import java.util.concurrent.atomic._
import java.io.File
import java.io.RandomAccessFile
import java.util.Random
import org.junit.{Test, After}
import org.scalatest.junit.JUnit3Suite
import kafka.utils.TestUtils
import kafka.message._
import kafka.utils.SystemTime
import scala.collection._
class LogSegmentTest extends JUnit3Suite {
class LogSegmentTest {
val segments = mutable.ArrayBuffer[LogSegment]()

View File

@ -20,7 +20,7 @@ package kafka.log
import java.io._
import java.util.Properties
import java.util.concurrent.atomic._
import junit.framework.Assert._
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.message._

View File

@ -18,7 +18,7 @@
package kafka.log
import java.io._
import junit.framework.Assert._
import org.junit.Assert._
import java.util.{Collections, Arrays}
import org.junit._
import org.scalatest.junit.JUnitSuite

View File

@ -20,7 +20,7 @@ package kafka.log
import java.nio._
import org.junit._
import org.scalatest.junit.JUnitSuite
import junit.framework.Assert._
import org.junit.Assert._
class OffsetMapTest extends JUnitSuite {

View File

@ -18,7 +18,7 @@
package kafka.message
import java.io.RandomAccessFile
import junit.framework.Assert._
import org.junit.Assert._
import kafka.utils.TestUtils._
import kafka.log.FileMessageSet
import org.scalatest.junit.JUnitSuite

View File

@ -19,7 +19,7 @@ package kafka.message
import java.nio._
import java.util.concurrent.atomic.AtomicLong
import junit.framework.Assert._
import org.junit.Assert._
import org.junit.Test
import kafka.utils.TestUtils

View File

@ -21,7 +21,7 @@ import java.io.ByteArrayOutputStream
import scala.collection._
import org.scalatest.junit.JUnitSuite
import org.junit._
import junit.framework.Assert._
import org.junit.Assert._
class MessageCompressionTest extends JUnitSuite {

View File

@ -20,7 +20,7 @@ package kafka.message
import java.nio._
import java.util.HashMap
import scala.collection._
import junit.framework.Assert._
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{Before, Test}
import kafka.utils.TestUtils

View File

@ -20,7 +20,7 @@ package kafka.message
import java.io.{InputStream, ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.ByteBuffer
import java.util.Random
import junit.framework.Assert._
import org.junit.Assert._
import org.junit.Test
import org.scalatest.junit.JUnitSuite

View File

@ -18,12 +18,11 @@ package kafka.metrics
*/
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import java.util.concurrent.TimeUnit
import junit.framework.Assert._
import org.junit.Assert._
import com.yammer.metrics.core.{MetricsRegistry, Clock}
class KafkaTimerTest extends JUnit3Suite {
class KafkaTimerTest {
@Test
def testKafkaTimer() {

View File

@ -21,11 +21,10 @@ import java.util.Properties
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.MetricPredicate
import org.junit.Test
import junit.framework.Assert._
import org.junit.{After, Test}
import org.junit.Assert._
import kafka.integration.KafkaServerTestHarness
import kafka.server._
import kafka.message._
import kafka.serializer._
import kafka.utils._
import kafka.admin.AdminUtils
@ -33,9 +32,8 @@ import kafka.utils.TestUtils._
import scala.collection._
import scala.collection.JavaConversions._
import scala.util.matching.Regex
import org.scalatest.junit.JUnit3Suite
class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
class MetricsTest extends KafkaServerTestHarness with Logging {
val numNodes = 2
val numParts = 2
val topic = "topic1"
@ -48,6 +46,7 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
val nMessages = 2
@After
override def tearDown() {
super.tearDown()
}

View File

@ -17,27 +17,26 @@
package kafka.network;
import java.net._
import java.io._
import java.net._
import java.nio.ByteBuffer
import java.util.Random
import kafka.api.ProducerRequest
import kafka.cluster.EndPoint
import kafka.common.TopicAndPartition
import kafka.message.ByteBufferMessageSet
import kafka.producer.SyncProducerConfig
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.NetworkSend
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.utils.SystemTime
import org.junit.Assert._
import org.junit._
import org.scalatest.junit.JUnitSuite
import java.util.Random
import junit.framework.Assert._
import kafka.producer.SyncProducerConfig
import kafka.api.ProducerRequest
import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
import kafka.message.ByteBufferMessageSet
import java.nio.channels.SelectionKey
import kafka.utils.TestUtils
import scala.collection.Map
class SocketServerTest extends JUnitSuite {
class SocketServerTest {
val server: SocketServer = new SocketServer(0,
Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT),
@ -84,11 +83,11 @@ class SocketServerTest extends JUnitSuite {
new Socket("localhost", server.boundPort(protocol))
}
@After
def cleanup() {
server.shutdown()
}
@Test
def simpleRequest() {
val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
@ -175,7 +174,7 @@ class SocketServerTest extends JUnitSuite {
}
@Test
def testMaxConnectionsPerIPOverrides(): Unit = {
def testMaxConnectionsPerIPOverrides() {
val overrideNum = 6
val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
val overrideServer: SocketServer = new SocketServer(0,

View File

@ -19,36 +19,27 @@ package kafka.producer
import java.util.Properties
import java.util.concurrent.LinkedBlockingQueue
import junit.framework.Assert._
import org.junit.Assert._
import org.easymock.EasyMock
import org.junit.Test
import kafka.api._
import kafka.cluster.{BrokerEndPoint, Broker}
import kafka.cluster.BrokerEndPoint
import kafka.common._
import kafka.message._
import kafka.producer.async._
import kafka.serializer._
import kafka.server.KafkaConfig
import kafka.utils.TestUtils._
import org.scalatest.junit.JUnit3Suite
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import kafka.utils._
class AsyncProducerTest extends JUnit3Suite {
class AsyncProducerTest {
// One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks
val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534))
val configs = props.map(KafkaConfig.fromProps)
val brokerList = configs.map(c => org.apache.kafka.common.utils.Utils.formatAddress(c.hostName, c.port)).mkString(",")
override def setUp() {
super.setUp()
}
override def tearDown() {
super.tearDown()
}
@Test
def testProducerQueueSize() {
// a mock event handler that blocks

View File

@ -17,26 +17,24 @@
package kafka.producer
import org.scalatest.TestFailedException
import org.scalatest.junit.JUnit3Suite
import java.util
import java.util.Properties
import kafka.admin.AdminUtils
import kafka.api.FetchRequestBuilder
import kafka.common.{ErrorMapping, FailedToSendMessageException}
import kafka.consumer.SimpleConsumer
import kafka.message.Message
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
import kafka.utils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.log4j.{Level, Logger}
import org.junit.Test
import kafka.utils._
import java.util
import kafka.admin.AdminUtils
import util.Properties
import kafka.api.FetchRequestBuilder
import org.junit.Assert.assertTrue
import org.junit.Assert.assertFalse
import org.junit.Assert.assertEquals
import kafka.common.{ErrorMapping, FailedToSendMessageException}
import kafka.serializer.StringEncoder
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.exceptions.TestFailedException
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
class ProducerTest extends ZooKeeperTestHarness with Logging{
private val brokerId1 = 0
private val brokerId2 = 1
private var server1: KafkaServer = null
@ -60,6 +58,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
consumer2
}
@Before
override def setUp() {
super.setUp()
// set up 2 brokers with 4 partitions each
@ -81,6 +80,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
requestHandlerLogger.setLevel(Level.FATAL)
}
@After
override def tearDown() {
// restore set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.ERROR)

View File

@ -20,7 +20,7 @@ package kafka.producer
import java.net.SocketTimeoutException
import java.util.Properties
import junit.framework.Assert
import org.junit.Assert
import kafka.admin.AdminUtils
import kafka.api.ProducerResponseStatus
import kafka.common.{ErrorMapping, TopicAndPartition}
@ -30,9 +30,8 @@ import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
class SyncProducerTest extends KafkaServerTestHarness {
private val messageBytes = new Array[Byte](2)
// turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))

View File

@ -17,18 +17,19 @@
package kafka.server
import junit.framework.Assert._
import org.junit.Assert._
import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol
import org.scalatest.junit.JUnit3Suite
import org.junit.{After, Before}
class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
class AdvertiseBrokerTest extends ZooKeeperTestHarness {
var server : KafkaServer = null
val brokerId = 0
val advertisedHostName = "routable-host"
val advertisedPort = 1234
@Before
override def setUp() {
super.setUp()
@ -39,6 +40,7 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
server = TestUtils.createServer(KafkaConfig.fromProps(props))
}
@After
override def tearDown() {
server.shutdown()
CoreUtils.rm(server.config.logDirs)

View File

@ -17,22 +17,21 @@
package kafka.server
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import junit.framework.Assert._
import org.junit.{After, Before, Test}
import org.junit.Assert._
class DelayedOperationTest extends JUnit3Suite {
class DelayedOperationTest {
var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null
override def setUp() {
super.setUp()
@Before
def setUp() {
purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock")
}
override def tearDown() {
@After
def tearDown() {
purgatory.shutdown()
super.tearDown()
}
@Test

View File

@ -26,9 +26,8 @@ import kafka.utils._
import kafka.common._
import kafka.log.LogConfig
import kafka.admin.{AdminOperationException, AdminUtils}
import org.scalatest.junit.JUnit3Suite
class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
class DynamicConfigChangeTest extends KafkaServerTestHarness {
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@Test

View File

@ -19,7 +19,6 @@ package kafka.server
import kafka.log._
import java.io.File
import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
import org.easymock.EasyMock
import org.junit._
import org.junit.Assert._
@ -28,7 +27,7 @@ import kafka.cluster.Replica
import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils}
import java.util.concurrent.atomic.AtomicBoolean
class HighwatermarkPersistenceTest extends JUnit3Suite {
class HighwatermarkPersistenceTest {
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
val topic = "foo"

View File

@ -18,7 +18,7 @@ package kafka.server
import java.util.Properties
import org.scalatest.junit.JUnit3Suite
import org.junit.{Before, After}
import collection.mutable.HashMap
import collection.mutable.Map
import kafka.cluster.{Partition, Replica}
@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.message.MessageSet
class IsrExpirationTest extends JUnit3Suite {
class IsrExpirationTest {
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
val replicaLagTimeMaxMs = 100L
@ -46,14 +46,14 @@ class IsrExpirationTest extends JUnit3Suite {
var replicaManager: ReplicaManager = null
override def setUp() {
super.setUp()
@Before
def setUp() {
replicaManager = new ReplicaManager(configs.head, time, null, null, null, new AtomicBoolean(false))
}
override def tearDown() {
@After
def tearDown() {
replicaManager.shutdown(false)
super.tearDown()
}
/*

View File

@ -26,9 +26,9 @@ import kafka.utils.{TestUtils, CoreUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.{Assert, Test}
import org.scalatest.junit.JUnit3Suite
import org.scalatest.Assertions.intercept
class KafkaConfigTest extends JUnit3Suite {
class KafkaConfigTest {
@Test
def testLogRetentionTimeHoursProvided() {

View File

@ -17,7 +17,7 @@
package kafka.server
import junit.framework.Assert._
import org.junit.Assert._
import kafka.api._
import kafka.utils.{TestUtils, ZkUtils, CoreUtils}
import kafka.cluster.Broker
@ -26,9 +26,9 @@ import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrA
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol
import org.scalatest.junit.JUnit3Suite
import org.junit.{After, Before}
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
class LeaderElectionTest extends ZooKeeperTestHarness {
val brokerId1 = 0
val brokerId2 = 1
@ -36,6 +36,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
var staleControllerEpochDetected = false
@Before
override def setUp() {
super.setUp()
@ -48,6 +49,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
servers ++= List(server1, server2)
}
@After
override def tearDown() {
servers.foreach(_.shutdown())
servers.foreach(server => CoreUtils.rm(server.config.logDirs))

View File

@ -19,12 +19,11 @@ package kafka.server
import java.io.File
import kafka.utils._
import junit.framework.Assert._
import org.junit.Assert._
import java.util.{Random, Properties}
import kafka.consumer.SimpleConsumer
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import kafka.admin.AdminUtils
import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
import kafka.utils.TestUtils._
@ -33,7 +32,7 @@ import org.junit.After
import org.junit.Before
import org.junit.Test
class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
class LogOffsetTest extends ZooKeeperTestHarness {
val random = new Random()
var logDir: File = null
var topicLogDir: File = null

View File

@ -27,10 +27,10 @@ import kafka.serializer.StringEncoder
import java.io.File
import org.scalatest.junit.JUnit3Suite
import org.junit.{After, Before}
import org.junit.Assert._
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
class LogRecoveryTest extends ZooKeeperTestHarness {
val replicaLagTimeMaxMs = 5000L
val replicaLagMaxMessages = 10L
@ -69,6 +69,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
keyEncoder = classOf[IntEncoder].getName)
}
@Before
override def setUp() {
super.setUp()
@ -86,6 +87,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
updateProducer()
}
@After
override def tearDown() {
producer.close()
for(server <- servers) {

View File

@ -25,7 +25,6 @@ import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnit3Suite
import java.util.Properties
import java.io.File
@ -33,9 +32,9 @@ import java.io.File
import scala.util.Random
import scala.collection._
import junit.framework.Assert._
import org.junit.Assert._
class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
class OffsetCommitTest extends ZooKeeperTestHarness {
val random: Random = new Random()
val group = "test-group"
val retentionCheckInterval: Long = 100L

View File

@ -17,7 +17,7 @@
package kafka.server
import org.scalatest.junit.JUnit3Suite
import org.junit.{After, Before}
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import kafka.producer.KeyedMessage
@ -25,11 +25,12 @@ import kafka.serializer.StringEncoder
import kafka.utils.{TestUtils}
import kafka.common._
class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
class ReplicaFetchTest extends ZooKeeperTestHarness {
var brokers: Seq[KafkaServer] = null
val topic1 = "foo"
val topic2 = "bar"
@Before
override def setUp() {
super.setUp()
brokers = createBrokerConfigs(2, zkConnect, false)
@ -37,6 +38,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
.map(config => TestUtils.createServer(config))
}
@After
override def tearDown() {
brokers.foreach(_.shutdown())
super.tearDown()

View File

@ -27,12 +27,11 @@ import java.io.File
import org.apache.kafka.common.protocol.Errors
import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import scala.collection.Map
class ReplicaManagerTest extends JUnit3Suite {
class ReplicaManagerTest {
val topic = "test-topic"
@ -84,7 +83,7 @@ class ReplicaManagerTest extends JUnit3Suite {
rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback)
rm.shutdown(false);
rm.shutdown(false)
TestUtils.verifyNonDaemonThreadsStatus

View File

@ -20,18 +20,18 @@ import java.util.Properties
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{TestUtils, CoreUtils}
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import junit.framework.Assert._
import org.junit.{Before, Test}
import org.junit.Assert._
import java.io.File
class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
var props1: Properties = null
var config1: KafkaConfig = null
var props2: Properties = null
var config2: KafkaConfig = null
val brokerMetaPropsFile = "meta.properties"
@Before
override def setUp() {
super.setUp()
props1 = TestUtils.createBrokerConfig(-1, zkConnect)

View File

@ -27,18 +27,18 @@ import kafka.serializer.StringEncoder
import java.io.File
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import junit.framework.Assert._
import org.junit.{Before, Test}
import org.junit.Assert._
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
class ServerShutdownTest extends ZooKeeperTestHarness {
var config: KafkaConfig = null
val host = "localhost"
val topic = "test"
val sent1 = List("hello", "there")
val sent2 = List("more", "messages")
override def setUp(): Unit = {
@Before
override def setUp() {
super.setUp()
val props = TestUtils.createBrokerConfig(0, zkConnect)
config = KafkaConfig.fromProps(props)

View File

@ -17,15 +17,14 @@
package kafka.server
import org.scalatest.junit.JUnit3Suite
import kafka.utils.ZkUtils
import kafka.utils.CoreUtils
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import junit.framework.Assert._
import org.junit.Assert._
class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
class ServerStartupTest extends ZooKeeperTestHarness {
def testBrokerCreatesZKChroot {
val brokerId = 0

View File

@ -22,18 +22,17 @@ import kafka.cluster.Replica
import kafka.common.TopicAndPartition
import kafka.log.Log
import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
import org.junit.{After, Before}
import scala.Some
import java.util.{Properties, Collections}
import java.util.concurrent.atomic.AtomicBoolean
import collection.JavaConversions._
import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
import junit.framework.Assert._
import org.junit.Assert._
class SimpleFetchTest extends JUnit3Suite {
class SimpleFetchTest {
val replicaLagTimeMaxMs = 100L
val replicaFetchWaitMaxMs = 100
@ -63,9 +62,8 @@ class SimpleFetchTest extends JUnit3Suite {
var replicaManager: ReplicaManager = null
override def setUp() {
super.setUp()
@Before
def setUp() {
// create nice mock since we don't particularly care about zkclient calls
val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
EasyMock.replay(zkClient)
@ -117,9 +115,9 @@ class SimpleFetchTest extends JUnit3Suite {
partition.inSyncReplicas = allReplicas.toSet
}
override def tearDown() {
@After
def tearDown() {
replicaManager.shutdown(false)
super.tearDown()
}
/**

View File

@ -19,7 +19,7 @@ package kafka.utils
import java.util.concurrent.TimeUnit
import junit.framework.Assert._
import org.junit.Assert._
import org.junit.Test
class ByteBoundedBlockingQueueTest {

View File

@ -17,7 +17,7 @@
package kafka.utils
import junit.framework.Assert._
import org.junit.Assert._
import org.junit.Test
class CommandLineUtilsTest {

View File

@ -16,7 +16,7 @@
*/
package kafka.utils
import junit.framework.Assert._
import org.junit.Assert._
import org.scalatest.Assertions
import org.junit.{Test, After, Before}

View File

@ -16,7 +16,7 @@
*/
package kafka.utils
import junit.framework.Assert._
import org.junit.Assert._
import org.junit.{Test, After, Before}
class JsonTest {

View File

@ -22,13 +22,12 @@ import kafka.server.{ReplicaFetcherManager, KafkaConfig}
import kafka.api.LeaderAndIsr
import kafka.zk.ZooKeeperTestHarness
import kafka.common.TopicAndPartition
import org.scalatest.junit.JUnit3Suite
import org.junit.Assert._
import org.junit.Test
import org.junit.{Before, Test}
import org.easymock.EasyMock
class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
class ReplicationUtilsTest extends ZooKeeperTestHarness {
val topic = "my-topic-test"
val partitionId = 0
val brokerId = 1
@ -45,7 +44,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(1,leaderEpoch,List(1,2),0), controllerEpoch)
@Before
override def setUp() {
super.setUp()
ZkUtils.createPersistentPath(zkClient,topicPath,topicData)

View File

@ -16,7 +16,7 @@
*/
package kafka.utils
import junit.framework.Assert._
import org.junit.Assert._
import java.util.concurrent.atomic._
import org.junit.{Test, After, Before}
import kafka.utils.TestUtils.retry

View File

@ -43,8 +43,7 @@ import kafka.admin.AdminUtils
import kafka.producer.ProducerConfig
import kafka.log._
import junit.framework.AssertionFailedError
import junit.framework.Assert._
import org.junit.Assert._
import org.apache.kafka.clients.producer.KafkaProducer
import scala.collection.Map
@ -595,7 +594,7 @@ object TestUtils extends Logging {
block
return
} catch {
case e: AssertionFailedError =>
case e: AssertionError =>
val ellapsed = System.currentTimeMillis - startTime
if(ellapsed > maxWaitMs) {
throw e

View File

@ -16,7 +16,7 @@
*/
package kafka.utils.timer
import junit.framework.Assert._
import org.junit.Assert._
import java.util.concurrent.atomic._
import org.junit.{Test, After, Before}

View File

@ -18,7 +18,7 @@ package kafka.utils.timer
import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit}
import junit.framework.Assert._
import org.junit.Assert._
import java.util.concurrent.atomic._
import org.junit.{Test, After, Before}

View File

@ -18,13 +18,11 @@
package kafka.zk
import kafka.consumer.ConsumerConfig
import org.I0Itec.zkclient.ZkClient
import kafka.utils.ZkUtils
import kafka.utils.TestUtils
import org.junit.Assert
import org.scalatest.junit.JUnit3Suite
class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
class ZKEphemeralTest extends ZooKeeperTestHarness {
var zkSessionTimeoutMs = 1000
def testEphemeralNodeCleanup = {

View File

@ -17,13 +17,12 @@
package kafka.zk
import junit.framework.Assert
import kafka.consumer.ConsumerConfig
import kafka.utils.{ZkPath, TestUtils, ZkUtils}
import org.apache.kafka.common.config.ConfigException
import org.scalatest.junit.JUnit3Suite
import org.junit.Assert._
class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
class ZKPathTest extends ZooKeeperTestHarness {
val path: String = "/some_dir"
val zkSessionTimeoutMs = 1000
@ -54,7 +53,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
case exception: Throwable => fail("Failed to create persistent path")
}
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
}
def testMakeSurePersistsPathExistsThrowsException {
@ -82,7 +81,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
case exception: Throwable => fail("Failed to create persistent path")
}
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
}
def testCreateEphemeralPathThrowsException {
@ -110,7 +109,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
case exception: Throwable => fail("Failed to create ephemeral path")
}
Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
}
def testCreatePersistentSequentialThrowsException {
@ -140,6 +139,6 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
case exception: Throwable => fail("Failed to create persistent path")
}
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath))
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath))
}
}

View File

@ -17,11 +17,12 @@
package kafka.zk
import org.scalatest.junit.JUnit3Suite
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZkUtils, CoreUtils}
import org.junit.{After, Before}
import org.scalatest.junit.JUnitSuite
trait ZooKeeperTestHarness extends JUnit3Suite {
trait ZooKeeperTestHarness extends JUnitSuite {
var zkPort: Int = -1
var zookeeper: EmbeddedZookeeper = null
var zkClient: ZkClient = null
@ -30,17 +31,17 @@ trait ZooKeeperTestHarness extends JUnit3Suite {
def zkConnect: String = "127.0.0.1:" + zkPort
override def setUp() {
super.setUp
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
zkPort = zookeeper.port
zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout)
}
override def tearDown() {
@After
def tearDown() {
CoreUtils.swallow(zkClient.close())
CoreUtils.swallow(zookeeper.shutdown())
super.tearDown
}
}