mirror of https://github.com/apache/kafka.git
Merge remote-tracking branch 'origin/trunk' into copycat
This commit is contained in:
commit
c0e5fdcff5
|
@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer._
|
|||
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.junit.Assert._
|
||||
import org.junit.Before
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
|
@ -52,6 +53,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
|
|||
.map(KafkaConfig.fromProps(_, serverConfig))
|
||||
}
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import kafka.server.KafkaConfig
|
|||
|
||||
import java.util.ArrayList
|
||||
import org.junit.Assert._
|
||||
import org.junit.Before
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import kafka.coordinator.ConsumerCoordinator
|
||||
|
@ -56,6 +57,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
||||
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
|
|||
import org.apache.kafka.clients.producer.KafkaProducer
|
||||
import kafka.server.{OffsetManager, KafkaConfig}
|
||||
import kafka.integration.KafkaServerTestHarness
|
||||
import org.junit.{After, Before}
|
||||
import scala.collection.mutable.Buffer
|
||||
import kafka.coordinator.ConsumerCoordinator
|
||||
|
||||
|
@ -49,6 +50,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
|
|||
cfgs.map(KafkaConfig.fromProps)
|
||||
}
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
|
||||
|
@ -70,7 +72,8 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
|
|||
servers,
|
||||
servers(0).consumerCoordinator.offsetsTopicConfigs)
|
||||
}
|
||||
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
producers.foreach(_.close())
|
||||
consumers.foreach(_.close())
|
||||
|
|
|
@ -22,7 +22,7 @@ import kafka.utils.{ShutdownableThread, TestUtils}
|
|||
import org.apache.kafka.clients.producer._
|
||||
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
|
||||
import org.junit.Assert._
|
||||
import org.junit.Test
|
||||
import org.junit.{After, Before, Test}
|
||||
|
||||
class ProducerBounceTest extends KafkaServerTestHarness {
|
||||
private val producerBufferSize = 30000
|
||||
|
@ -62,6 +62,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
|
|||
private val topic1 = "topic-1"
|
||||
private val topic2 = "topic-2"
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
|
||||
|
@ -70,6 +71,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
|
|||
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
if (producer1 != null) producer1.close
|
||||
if (producer2 != null) producer2.close
|
||||
|
|
|
@ -19,7 +19,6 @@ package kafka.api.test
|
|||
|
||||
import java.util.{Properties, Collection, ArrayList}
|
||||
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.runners.Parameterized
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.Parameterized.Parameters
|
||||
|
@ -36,7 +35,7 @@ import kafka.utils.{CoreUtils, TestUtils}
|
|||
|
||||
|
||||
@RunWith(value = classOf[Parameterized])
|
||||
class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness {
|
||||
private val brokerId = 0
|
||||
private var server: KafkaServer = null
|
||||
|
||||
|
|
|
@ -17,22 +17,20 @@
|
|||
|
||||
package kafka.api
|
||||
|
||||
import org.junit.Test
|
||||
import org.junit.Assert._
|
||||
|
||||
import java.util.concurrent.{ExecutionException, TimeUnit, TimeoutException}
|
||||
import java.util.{Properties, Random}
|
||||
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
|
||||
|
||||
import kafka.common.Topic
|
||||
import kafka.consumer.SimpleConsumer
|
||||
import kafka.integration.KafkaServerTestHarness
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.{ShutdownableThread, TestUtils}
|
||||
|
||||
import org.apache.kafka.common.KafkaException
|
||||
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException}
|
||||
import org.apache.kafka.clients.producer._
|
||||
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
|
||||
import org.apache.kafka.common.KafkaException
|
||||
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasAfterAppendException, NotEnoughReplicasException}
|
||||
import org.junit.Assert._
|
||||
import org.junit.{After, Before, Test}
|
||||
|
||||
class ProducerFailureHandlingTest extends KafkaServerTestHarness {
|
||||
private val producerBufferSize = 30000
|
||||
|
@ -61,6 +59,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
|
|||
private val topic1 = "topic-1"
|
||||
private val topic2 = "topic-2"
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
|
||||
|
@ -69,6 +68,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
|
|||
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
if (producer1 != null) producer1.close
|
||||
if (producer2 != null) producer2.close
|
||||
|
|
|
@ -30,11 +30,9 @@ import org.apache.kafka.common.config.ConfigException
|
|||
import org.apache.kafka.common.errors.SerializationException
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer
|
||||
import org.junit.Assert._
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.{After, Before, Test}
|
||||
|
||||
|
||||
class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||
class ProducerSendTest extends KafkaServerTestHarness {
|
||||
val numServers = 2
|
||||
|
||||
val overridingProps = new Properties()
|
||||
|
@ -49,6 +47,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
|
|||
private val topic = "topic"
|
||||
private val numRecords = 100
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
|
||||
|
@ -57,6 +56,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
|
|||
consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "")
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
consumer1.close()
|
||||
consumer2.close()
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.security.Permission
|
|||
|
||||
import kafka.server.KafkaConfig
|
||||
import org.junit.{After, Before, Test}
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
|
||||
class KafkaTest {
|
||||
|
||||
|
|
|
@ -17,17 +17,17 @@
|
|||
|
||||
package kafka.admin
|
||||
|
||||
import org.junit.Assert._
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import kafka.utils.TestUtils._
|
||||
import junit.framework.Assert._
|
||||
import kafka.utils.{ZkUtils, CoreUtils, TestUtils}
|
||||
import kafka.cluster.Broker
|
||||
import kafka.client.ClientUtils
|
||||
import kafka.server.{KafkaConfig, KafkaServer}
|
||||
import org.junit.{After, Before}
|
||||
|
||||
class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class AddPartitionsTest extends ZooKeeperTestHarness {
|
||||
var configs: Seq[KafkaConfig] = null
|
||||
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
|
||||
var brokers: Seq[Broker] = Seq.empty[Broker]
|
||||
|
@ -39,6 +39,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
val topic3 = "new-topic3"
|
||||
val topic4 = "new-topic4"
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
|
||||
|
@ -54,6 +55,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
servers.foreach(_.shutdown())
|
||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||
|
|
|
@ -18,7 +18,6 @@ package kafka.admin
|
|||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import java.util.Properties
|
||||
import kafka.utils._
|
||||
import kafka.log._
|
||||
|
@ -30,7 +29,7 @@ import java.io.File
|
|||
import TestUtils._
|
||||
|
||||
|
||||
class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
|
||||
class AdminTest extends ZooKeeperTestHarness with Logging {
|
||||
|
||||
@Test
|
||||
def testReplicaAssignment() {
|
||||
|
|
|
@ -19,15 +19,10 @@ package kafka.admin
|
|||
import junit.framework.Assert._
|
||||
import kafka.admin.ConfigCommand.ConfigCommandOptions
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.utils.Logging
|
||||
import kafka.utils.TestUtils
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
|
||||
import kafka.admin.TopicCommand.TopicCommandOptions
|
||||
import kafka.utils.ZkUtils
|
||||
|
||||
class ConfigCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
|
||||
class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
|
||||
@Test
|
||||
def testArgumentParse() {
|
||||
// Should parse correctly
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package kafka.admin
|
||||
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.utils._
|
||||
import kafka.server.KafkaConfig
|
||||
import org.junit.Test
|
||||
|
@ -25,7 +24,7 @@ import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
|
|||
import kafka.integration.KafkaServerTestHarness
|
||||
|
||||
|
||||
class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||
class DeleteConsumerGroupTest extends KafkaServerTestHarness {
|
||||
def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
|
||||
|
||||
@Test
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package kafka.admin
|
||||
|
||||
import kafka.log.Log
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import junit.framework.Assert._
|
||||
import kafka.utils.{ZkUtils, TestUtils}
|
||||
|
@ -26,7 +25,7 @@ import org.junit.Test
|
|||
import java.util.Properties
|
||||
import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
|
||||
|
||||
class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class DeleteTopicTest extends ZooKeeperTestHarness {
|
||||
|
||||
@Test
|
||||
def testDeleteTopicWithAllAliveReplicas() {
|
||||
|
|
|
@ -18,16 +18,15 @@ package kafka.admin
|
|||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.utils.Logging
|
||||
import kafka.utils.TestUtils
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
|
||||
import kafka.server.ConfigType
|
||||
import kafka.admin.TopicCommand.TopicCommandOptions
|
||||
import kafka.utils.ZkUtils
|
||||
import kafka.coordinator.ConsumerCoordinator
|
||||
|
||||
class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
|
||||
class TopicCommandTest extends ZooKeeperTestHarness with Logging {
|
||||
|
||||
@Test
|
||||
def testConfigPreservationAcrossPartitionAlteration() {
|
||||
|
|
|
@ -19,7 +19,7 @@ package kafka.api
|
|||
|
||||
import org.junit._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import scala.util.Random
|
||||
import java.nio.ByteBuffer
|
||||
import kafka.common.KafkaException
|
||||
|
|
|
@ -32,7 +32,7 @@ import java.nio.ByteBuffer
|
|||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.junit._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
|
||||
|
||||
object SerializationTestUtils {
|
||||
|
|
|
@ -22,11 +22,10 @@ import java.nio.ByteBuffer
|
|||
import kafka.utils.Logging
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
class BrokerEndPointTest extends JUnit3Suite with Logging {
|
||||
class BrokerEndPointTest extends Logging {
|
||||
|
||||
@Test
|
||||
def testSerDe() = {
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package kafka.common
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import collection.mutable.ArrayBuffer
|
||||
import org.junit.Test
|
||||
import kafka.producer.ProducerConfig
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package kafka.common
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import collection.mutable.ArrayBuffer
|
||||
import org.junit.Test
|
||||
|
||||
|
|
|
@ -18,22 +18,20 @@
|
|||
|
||||
package kafka.consumer
|
||||
|
||||
import java.util.Properties
|
||||
import java.util.concurrent._
|
||||
import java.util.concurrent.atomic._
|
||||
import scala.collection._
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
|
||||
import kafka.message._
|
||||
import kafka.server._
|
||||
import kafka.utils.TestUtils._
|
||||
import kafka.utils._
|
||||
import org.junit.Test
|
||||
import org.junit.{Before, Test}
|
||||
import kafka.serializer._
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.integration.KafkaServerTestHarness
|
||||
|
||||
class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||
class ConsumerIteratorTest extends KafkaServerTestHarness {
|
||||
|
||||
val numNodes = 1
|
||||
|
||||
|
@ -49,6 +47,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
|
|||
|
||||
def consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
topicInfos = configs.map(c => new PartitionTopicInfo(topic,
|
||||
|
|
|
@ -17,18 +17,17 @@
|
|||
|
||||
package kafka.consumer
|
||||
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.easymock.EasyMock
|
||||
import org.I0Itec.zkclient.ZkClient
|
||||
import org.apache.zookeeper.data.Stat
|
||||
import kafka.utils.{TestUtils, Logging, ZkUtils, Json}
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import kafka.common.TopicAndPartition
|
||||
import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
|
||||
import kafka.consumer.PartitionAssignorTest.Scenario
|
||||
import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
|
||||
|
||||
class PartitionAssignorTest extends JUnit3Suite with Logging {
|
||||
class PartitionAssignorTest extends Logging {
|
||||
|
||||
def testRoundRobinPartitionAssignor() {
|
||||
val assignor = new RoundRobinAssignor
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package kafka.consumer
|
||||
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import kafka.server.OffsetManager
|
||||
|
|
|
@ -19,7 +19,7 @@ package kafka.consumer
|
|||
|
||||
import java.util.{Collections, Properties}
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import kafka.common.MessageStreamsExistException
|
||||
import kafka.integration.KafkaServerTestHarness
|
||||
import kafka.javaapi.consumer.ConsumerRebalanceListener
|
||||
|
@ -30,11 +30,11 @@ import kafka.utils.TestUtils._
|
|||
import kafka.utils._
|
||||
import org.I0Itec.zkclient.ZkClient
|
||||
import org.apache.log4j.{Level, Logger}
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.{After, Before}
|
||||
|
||||
import scala.collection._
|
||||
|
||||
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
|
||||
class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging {
|
||||
|
||||
val RebalanceBackoffMs = 5000
|
||||
var dirs : ZKGroupTopicDirs = null
|
||||
|
@ -54,11 +54,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
|
|||
val consumer3 = "consumer3"
|
||||
val nMessages = 2
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
dirs = new ZKGroupTopicDirs(group, topic)
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
super.tearDown()
|
||||
}
|
||||
|
|
|
@ -20,10 +20,10 @@ package kafka.coordinator
|
|||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import kafka.common.TopicAndPartition
|
||||
import kafka.server.{OffsetManager, ReplicaManager, KafkaConfig}
|
||||
import kafka.utils.{KafkaScheduler, TestUtils}
|
||||
import kafka.server.{OffsetManager, KafkaConfig}
|
||||
import kafka.utils.TestUtils
|
||||
import org.apache.kafka.common.protocol.Errors
|
||||
import org.apache.kafka.common.requests.JoinGroupRequest
|
||||
import org.easymock.EasyMock
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package kafka.coordinator
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.junit.{Before, Test}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ package kafka.coordinator
|
|||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.{ZkUtils, TestUtils}
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
|
||||
import org.apache.zookeeper.data.Stat
|
||||
import org.easymock.EasyMock
|
||||
|
|
|
@ -19,7 +19,7 @@ package kafka.coordinator
|
|||
|
||||
import kafka.common.TopicAndPartition
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
|
|
|
@ -24,12 +24,11 @@ import kafka.utils.TestUtils
|
|||
import kafka.serializer._
|
||||
import kafka.producer.{Producer, KeyedMessage}
|
||||
|
||||
import org.junit.Test
|
||||
import org.junit.{After, Before, Test}
|
||||
import org.apache.log4j.{Level, Logger}
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
|
||||
class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
|
||||
class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
|
||||
|
||||
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
|
||||
|
||||
|
@ -42,12 +41,14 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
|
|||
|
||||
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
// temporarily set request handler logger to a higher level
|
||||
requestHandlerLogger.setLevel(Level.FATAL)
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
// restore set request handler logger to a higher level
|
||||
requestHandlerLogger.setLevel(Level.ERROR)
|
||||
|
|
|
@ -19,18 +19,17 @@ package kafka.integration
|
|||
|
||||
import java.util.concurrent._
|
||||
import java.util.concurrent.atomic._
|
||||
import org.junit.{After, Before}
|
||||
|
||||
import scala.collection._
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
|
||||
import kafka.cluster._
|
||||
import kafka.server._
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.consumer._
|
||||
import kafka.serializer._
|
||||
import kafka.producer.{KeyedMessage, Producer}
|
||||
import kafka.utils.TestUtils
|
||||
|
||||
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||
class FetcherTest extends KafkaServerTestHarness {
|
||||
val numNodes = 1
|
||||
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
|
||||
|
||||
|
@ -40,6 +39,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
|
|||
|
||||
var fetcher: ConsumerFetcherManager = null
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp
|
||||
TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
|
||||
|
@ -59,6 +59,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
|
|||
fetcher.startConnections(topicInfos, cluster)
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
fetcher.stopConnections()
|
||||
super.tearDown
|
||||
|
|
|
@ -19,17 +19,18 @@ package kafka.integration
|
|||
|
||||
import java.util.Arrays
|
||||
|
||||
import scala.collection.mutable.Buffer
|
||||
import kafka.common.KafkaException
|
||||
import kafka.server._
|
||||
import kafka.utils.{CoreUtils, TestUtils}
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import kafka.common.KafkaException
|
||||
import org.junit.{After, Before}
|
||||
|
||||
import scala.collection.mutable.Buffer
|
||||
|
||||
/**
|
||||
* A test harness that brings up some number of broker nodes
|
||||
*/
|
||||
trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
trait KafkaServerTestHarness extends ZooKeeperTestHarness {
|
||||
var instanceConfigs: Seq[KafkaConfig] = null
|
||||
var servers: Buffer[KafkaServer] = null
|
||||
var brokerList: String = null
|
||||
|
@ -51,7 +52,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
|
||||
def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",")
|
||||
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp
|
||||
if(configs.size <= 0)
|
||||
|
@ -62,6 +63,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
Arrays.fill(alive, true)
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
servers.foreach(_.shutdown())
|
||||
servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_)))
|
||||
|
|
|
@ -21,9 +21,8 @@ import java.util.Properties
|
|||
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.TestUtils
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
|
||||
class MinIsrConfigTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||
class MinIsrConfigTest extends KafkaServerTestHarness {
|
||||
|
||||
val overridingProps = new Properties()
|
||||
overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5")
|
||||
|
|
|
@ -18,13 +18,12 @@
|
|||
package kafka.integration
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
|
||||
import kafka.server.{KafkaRequestHandler, KafkaConfig}
|
||||
import kafka.producer.{KeyedMessage, Producer}
|
||||
import org.apache.log4j.{Level, Logger}
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import scala.collection._
|
||||
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
|
||||
import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils}
|
||||
|
@ -34,7 +33,7 @@ import java.util.Properties
|
|||
/**
|
||||
* End to end tests of the primitive apis against a local server
|
||||
*/
|
||||
class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
|
||||
class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHarness {
|
||||
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
|
||||
|
||||
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
|
||||
|
|
|
@ -5,8 +5,8 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
@ -18,28 +18,30 @@
|
|||
package kafka.integration
|
||||
|
||||
import kafka.consumer.SimpleConsumer
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.{After, Before}
|
||||
import kafka.producer.Producer
|
||||
import kafka.utils.{StaticPartitioner, TestUtils}
|
||||
import kafka.serializer.StringEncoder
|
||||
|
||||
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
|
||||
val host = "localhost"
|
||||
var producer: Producer[String, String] = null
|
||||
var consumer: SimpleConsumer = null
|
||||
trait ProducerConsumerTestHarness extends KafkaServerTestHarness {
|
||||
val host = "localhost"
|
||||
var producer: Producer[String, String] = null
|
||||
var consumer: SimpleConsumer = null
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp
|
||||
producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers),
|
||||
encoder = classOf[StringEncoder].getName,
|
||||
keyEncoder = classOf[StringEncoder].getName,
|
||||
partitioner = classOf[StaticPartitioner].getName)
|
||||
consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64*1024, "")
|
||||
}
|
||||
super.setUp
|
||||
producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers),
|
||||
encoder = classOf[StringEncoder].getName,
|
||||
keyEncoder = classOf[StringEncoder].getName,
|
||||
partitioner = classOf[StaticPartitioner].getName)
|
||||
consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64 * 1024, "")
|
||||
}
|
||||
|
||||
override def tearDown() {
|
||||
producer.close()
|
||||
consumer.close()
|
||||
super.tearDown
|
||||
}
|
||||
@After
|
||||
override def tearDown() {
|
||||
producer.close()
|
||||
consumer.close()
|
||||
super.tearDown
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,18 +17,19 @@
|
|||
|
||||
package kafka.integration
|
||||
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.{After, Before}
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import kafka.utils.TestUtils._
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import kafka.utils.{CoreUtils, TestUtils}
|
||||
import kafka.server.{KafkaConfig, KafkaServer}
|
||||
|
||||
class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class RollingBounceTest extends ZooKeeperTestHarness {
|
||||
|
||||
val partitionId = 0
|
||||
var servers: Seq[KafkaServer] = null
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
// controlled.shutdown.enable is true by default
|
||||
|
@ -39,6 +40,7 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
servers = configs.map(c => TestUtils.createServer(KafkaConfig.fromProps(c)))
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
servers.foreach(_.shutdown())
|
||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||
|
|
|
@ -19,9 +19,8 @@ package kafka.integration
|
|||
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
import junit.framework.Assert._
|
||||
import kafka.admin.AdminUtils
|
||||
import kafka.api.{TopicMetadataResponse, TopicMetadataRequest}
|
||||
import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
|
||||
import kafka.client.ClientUtils
|
||||
import kafka.cluster.{Broker, BrokerEndPoint}
|
||||
import kafka.common.ErrorMapping
|
||||
|
@ -30,14 +29,16 @@ import kafka.utils.TestUtils
|
|||
import kafka.utils.TestUtils._
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.Assert._
|
||||
import org.junit.{After, Before}
|
||||
|
||||
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class TopicMetadataTest extends ZooKeeperTestHarness {
|
||||
private var server1: KafkaServer = null
|
||||
var brokerEndPoints: Seq[BrokerEndPoint] = null
|
||||
var adHocConfigs: Seq[KafkaConfig] = null
|
||||
val numConfigs: Int = 4
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
val props = createBrokerConfigs(numConfigs, zkConnect)
|
||||
|
@ -47,6 +48,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
server1.shutdown()
|
||||
super.tearDown()
|
||||
|
|
|
@ -18,24 +18,22 @@
|
|||
package kafka.integration
|
||||
|
||||
import org.apache.kafka.common.config.ConfigException
|
||||
import org.junit.{After, Before}
|
||||
|
||||
import scala.collection.mutable.MutableList
|
||||
import scala.util.Random
|
||||
import org.apache.log4j.{Level, Logger}
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import java.util.Properties
|
||||
import junit.framework.Assert._
|
||||
import kafka.admin.AdminUtils
|
||||
import kafka.common.FailedToSendMessageException
|
||||
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException}
|
||||
import kafka.producer.{KeyedMessage, Producer}
|
||||
import kafka.consumer.{Consumer, ConsumerConfig}
|
||||
import kafka.serializer.StringDecoder
|
||||
import kafka.server.{KafkaConfig, KafkaServer}
|
||||
import kafka.utils.CoreUtils
|
||||
import kafka.utils.TestUtils._
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.junit.Assert._
|
||||
|
||||
class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
||||
val brokerId1 = 0
|
||||
val brokerId2 = 1
|
||||
|
||||
|
@ -58,6 +56,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer])
|
||||
val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]])
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
|
||||
|
@ -77,6 +76,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
eventHandlerLogger.setLevel(Level.FATAL)
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
servers.foreach(server => shutdownServer(server))
|
||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||
|
|
|
@ -20,7 +20,6 @@ package kafka.javaapi.consumer
|
|||
import java.util.Properties
|
||||
|
||||
import kafka.server._
|
||||
import kafka.message._
|
||||
import kafka.serializer._
|
||||
import kafka.integration.KafkaServerTestHarness
|
||||
import kafka.producer.KeyedMessage
|
||||
|
@ -33,12 +32,11 @@ import kafka.common.MessageStreamsExistException
|
|||
|
||||
import scala.collection.JavaConversions
|
||||
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.apache.log4j.{Level, Logger}
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
|
||||
|
||||
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
|
||||
class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
|
||||
val numNodes = 2
|
||||
val numParts = 2
|
||||
val topic = "topic1"
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package kafka.javaapi.message
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import kafka.utils.TestUtils
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package kafka.javaapi.message
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.junit.Test
|
||||
import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
|
||||
|
||||
|
|
|
@ -17,19 +17,20 @@
|
|||
|
||||
package kafka.log
|
||||
|
||||
import java.util.Properties
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.{After, Test}
|
||||
import java.nio._
|
||||
import java.io.File
|
||||
import scala.collection._
|
||||
import kafka.common._
|
||||
import kafka.utils._
|
||||
import kafka.message._
|
||||
import java.nio._
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import kafka.common._
|
||||
import kafka.message._
|
||||
import kafka.utils._
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
import org.junit.Assert._
|
||||
import org.junit.{After, Test}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
import scala.collection._
|
||||
|
||||
/**
|
||||
* Unit tests for the log cleaning logic
|
||||
|
|
|
@ -20,7 +20,7 @@ package kafka.log
|
|||
import java.io._
|
||||
import java.nio._
|
||||
import java.util.concurrent.atomic._
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import kafka.utils.TestUtils._
|
||||
import kafka.message._
|
||||
import org.junit.Test
|
||||
|
|
|
@ -30,16 +30,14 @@ import org.junit._
|
|||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.Parameterized
|
||||
import org.junit.runners.Parameterized.Parameters
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
|
||||
import scala.collection._
|
||||
|
||||
|
||||
/**
|
||||
* This is an integration test that tests the fully integrated log cleaner
|
||||
*/
|
||||
@RunWith(value = classOf[Parameterized])
|
||||
class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite {
|
||||
class LogCleanerIntegrationTest(compressionCodec: String) {
|
||||
|
||||
val time = new MockTime()
|
||||
val segmentSize = 100
|
||||
|
|
|
@ -21,9 +21,9 @@ import java.util.Properties
|
|||
|
||||
import org.apache.kafka.common.config.ConfigException
|
||||
import org.junit.{Assert, Test}
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.scalatest.Assertions._
|
||||
|
||||
class LogConfigTest extends JUnit3Suite {
|
||||
class LogConfigTest {
|
||||
|
||||
@Test
|
||||
def testFromPropsEmpty() {
|
||||
|
|
|
@ -19,14 +19,14 @@ package kafka.log
|
|||
|
||||
import java.io._
|
||||
import java.util.Properties
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.server.{BrokerState, OffsetCheckpoint}
|
||||
import kafka.common._
|
||||
import kafka.utils._
|
||||
|
||||
class LogManagerTest extends JUnit3Suite {
|
||||
import kafka.common._
|
||||
import kafka.server.OffsetCheckpoint
|
||||
import kafka.utils._
|
||||
import org.junit.Assert._
|
||||
import org.junit.{After, Before, Test}
|
||||
|
||||
class LogManagerTest {
|
||||
|
||||
val time: MockTime = new MockTime()
|
||||
val maxRollInterval = 100
|
||||
|
@ -41,20 +41,20 @@ class LogManagerTest extends JUnit3Suite {
|
|||
val name = "kafka"
|
||||
val veryLargeLogFlushInterval = 10000000L
|
||||
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
@Before
|
||||
def setUp() {
|
||||
logDir = TestUtils.tempDir()
|
||||
logManager = createLogManager()
|
||||
logManager.startup
|
||||
logDir = logManager.logDirs(0)
|
||||
}
|
||||
|
||||
override def tearDown() {
|
||||
@After
|
||||
def tearDown() {
|
||||
if(logManager != null)
|
||||
logManager.shutdown()
|
||||
CoreUtils.rm(logDir)
|
||||
logManager.logDirs.foreach(CoreUtils.rm(_))
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -16,19 +16,15 @@
|
|||
*/
|
||||
package kafka.log
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import java.util.concurrent.atomic._
|
||||
import java.io.File
|
||||
import java.io.RandomAccessFile
|
||||
import java.util.Random
|
||||
import org.junit.{Test, After}
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.utils.TestUtils
|
||||
import kafka.message._
|
||||
import kafka.utils.SystemTime
|
||||
import scala.collection._
|
||||
|
||||
class LogSegmentTest extends JUnit3Suite {
|
||||
class LogSegmentTest {
|
||||
|
||||
val segments = mutable.ArrayBuffer[LogSegment]()
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ package kafka.log
|
|||
import java.io._
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.atomic._
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.{After, Before, Test}
|
||||
import kafka.message._
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package kafka.log
|
||||
|
||||
import java.io._
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import java.util.{Collections, Arrays}
|
||||
import org.junit._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
|
|
@ -20,7 +20,7 @@ package kafka.log
|
|||
import java.nio._
|
||||
import org.junit._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
|
||||
class OffsetMapTest extends JUnitSuite {
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package kafka.message
|
||||
|
||||
import java.io.RandomAccessFile
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import kafka.utils.TestUtils._
|
||||
import kafka.log.FileMessageSet
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
|
|
@ -19,7 +19,7 @@ package kafka.message
|
|||
|
||||
import java.nio._
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.junit.Test
|
||||
import kafka.utils.TestUtils
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.io.ByteArrayOutputStream
|
|||
import scala.collection._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit._
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
|
||||
class MessageCompressionTest extends JUnitSuite {
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ package kafka.message
|
|||
import java.nio._
|
||||
import java.util.HashMap
|
||||
import scala.collection._
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.{Before, Test}
|
||||
import kafka.utils.TestUtils
|
||||
|
|
|
@ -20,7 +20,7 @@ package kafka.message
|
|||
import java.io.{InputStream, ByteArrayInputStream, ByteArrayOutputStream}
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.Random
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
|
|
|
@ -18,12 +18,11 @@ package kafka.metrics
|
|||
*/
|
||||
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import java.util.concurrent.TimeUnit
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import com.yammer.metrics.core.{MetricsRegistry, Clock}
|
||||
|
||||
class KafkaTimerTest extends JUnit3Suite {
|
||||
class KafkaTimerTest {
|
||||
|
||||
@Test
|
||||
def testKafkaTimer() {
|
||||
|
|
|
@ -21,11 +21,10 @@ import java.util.Properties
|
|||
|
||||
import com.yammer.metrics.Metrics
|
||||
import com.yammer.metrics.core.MetricPredicate
|
||||
import org.junit.Test
|
||||
import junit.framework.Assert._
|
||||
import org.junit.{After, Test}
|
||||
import org.junit.Assert._
|
||||
import kafka.integration.KafkaServerTestHarness
|
||||
import kafka.server._
|
||||
import kafka.message._
|
||||
import kafka.serializer._
|
||||
import kafka.utils._
|
||||
import kafka.admin.AdminUtils
|
||||
|
@ -33,9 +32,8 @@ import kafka.utils.TestUtils._
|
|||
import scala.collection._
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.util.matching.Regex
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
|
||||
class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
|
||||
class MetricsTest extends KafkaServerTestHarness with Logging {
|
||||
val numNodes = 2
|
||||
val numParts = 2
|
||||
val topic = "topic1"
|
||||
|
@ -48,6 +46,7 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
|
|||
|
||||
val nMessages = 2
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
super.tearDown()
|
||||
}
|
||||
|
|
|
@ -17,27 +17,26 @@
|
|||
|
||||
package kafka.network;
|
||||
|
||||
import java.net._
|
||||
import java.io._
|
||||
import java.net._
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.Random
|
||||
|
||||
import kafka.api.ProducerRequest
|
||||
import kafka.cluster.EndPoint
|
||||
import kafka.common.TopicAndPartition
|
||||
import kafka.message.ByteBufferMessageSet
|
||||
import kafka.producer.SyncProducerConfig
|
||||
import org.apache.kafka.common.metrics.Metrics
|
||||
import org.apache.kafka.common.network.NetworkSend
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.apache.kafka.common.utils.SystemTime
|
||||
import org.junit.Assert._
|
||||
import org.junit._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import java.util.Random
|
||||
import junit.framework.Assert._
|
||||
import kafka.producer.SyncProducerConfig
|
||||
import kafka.api.ProducerRequest
|
||||
import java.nio.ByteBuffer
|
||||
import kafka.common.TopicAndPartition
|
||||
import kafka.message.ByteBufferMessageSet
|
||||
import java.nio.channels.SelectionKey
|
||||
import kafka.utils.TestUtils
|
||||
|
||||
import scala.collection.Map
|
||||
|
||||
class SocketServerTest extends JUnitSuite {
|
||||
class SocketServerTest {
|
||||
|
||||
val server: SocketServer = new SocketServer(0,
|
||||
Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT),
|
||||
|
@ -84,11 +83,11 @@ class SocketServerTest extends JUnitSuite {
|
|||
new Socket("localhost", server.boundPort(protocol))
|
||||
}
|
||||
|
||||
|
||||
@After
|
||||
def cleanup() {
|
||||
server.shutdown()
|
||||
}
|
||||
|
||||
@Test
|
||||
def simpleRequest() {
|
||||
val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
|
||||
|
@ -175,7 +174,7 @@ class SocketServerTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test
|
||||
def testMaxConnectionsPerIPOverrides(): Unit = {
|
||||
def testMaxConnectionsPerIPOverrides() {
|
||||
val overrideNum = 6
|
||||
val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
|
||||
val overrideServer: SocketServer = new SocketServer(0,
|
||||
|
|
|
@ -19,36 +19,27 @@ package kafka.producer
|
|||
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.easymock.EasyMock
|
||||
import org.junit.Test
|
||||
import kafka.api._
|
||||
import kafka.cluster.{BrokerEndPoint, Broker}
|
||||
import kafka.cluster.BrokerEndPoint
|
||||
import kafka.common._
|
||||
import kafka.message._
|
||||
import kafka.producer.async._
|
||||
import kafka.serializer._
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.TestUtils._
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import scala.collection.Map
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import kafka.utils._
|
||||
|
||||
class AsyncProducerTest extends JUnit3Suite {
|
||||
class AsyncProducerTest {
|
||||
// One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks
|
||||
val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534))
|
||||
val configs = props.map(KafkaConfig.fromProps)
|
||||
val brokerList = configs.map(c => org.apache.kafka.common.utils.Utils.formatAddress(c.hostName, c.port)).mkString(",")
|
||||
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
}
|
||||
|
||||
override def tearDown() {
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testProducerQueueSize() {
|
||||
// a mock event handler that blocks
|
||||
|
|
|
@ -17,26 +17,24 @@
|
|||
|
||||
package kafka.producer
|
||||
|
||||
import org.scalatest.TestFailedException
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import java.util
|
||||
import java.util.Properties
|
||||
|
||||
import kafka.admin.AdminUtils
|
||||
import kafka.api.FetchRequestBuilder
|
||||
import kafka.common.{ErrorMapping, FailedToSendMessageException}
|
||||
import kafka.consumer.SimpleConsumer
|
||||
import kafka.message.Message
|
||||
import kafka.serializer.StringEncoder
|
||||
import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
|
||||
import kafka.utils._
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.apache.log4j.{Level, Logger}
|
||||
import org.junit.Test
|
||||
import kafka.utils._
|
||||
import java.util
|
||||
import kafka.admin.AdminUtils
|
||||
import util.Properties
|
||||
import kafka.api.FetchRequestBuilder
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Assert.assertFalse
|
||||
import org.junit.Assert.assertEquals
|
||||
import kafka.common.{ErrorMapping, FailedToSendMessageException}
|
||||
import kafka.serializer.StringEncoder
|
||||
import org.junit.Assert._
|
||||
import org.junit.{After, Before, Test}
|
||||
import org.scalatest.exceptions.TestFailedException
|
||||
|
||||
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
|
||||
class ProducerTest extends ZooKeeperTestHarness with Logging{
|
||||
private val brokerId1 = 0
|
||||
private val brokerId2 = 1
|
||||
private var server1: KafkaServer = null
|
||||
|
@ -60,6 +58,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
|
|||
consumer2
|
||||
}
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
// set up 2 brokers with 4 partitions each
|
||||
|
@ -81,6 +80,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
|
|||
requestHandlerLogger.setLevel(Level.FATAL)
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
// restore set request handler logger to a higher level
|
||||
requestHandlerLogger.setLevel(Level.ERROR)
|
||||
|
|
|
@ -20,7 +20,7 @@ package kafka.producer
|
|||
import java.net.SocketTimeoutException
|
||||
import java.util.Properties
|
||||
|
||||
import junit.framework.Assert
|
||||
import org.junit.Assert
|
||||
import kafka.admin.AdminUtils
|
||||
import kafka.api.ProducerResponseStatus
|
||||
import kafka.common.{ErrorMapping, TopicAndPartition}
|
||||
|
@ -30,9 +30,8 @@ import kafka.server.KafkaConfig
|
|||
import kafka.utils._
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
|
||||
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||
class SyncProducerTest extends KafkaServerTestHarness {
|
||||
private val messageBytes = new Array[Byte](2)
|
||||
// turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
|
||||
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
|
||||
|
|
|
@ -17,18 +17,19 @@
|
|||
|
||||
package kafka.server
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.{After, Before}
|
||||
|
||||
class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class AdvertiseBrokerTest extends ZooKeeperTestHarness {
|
||||
var server : KafkaServer = null
|
||||
val brokerId = 0
|
||||
val advertisedHostName = "routable-host"
|
||||
val advertisedPort = 1234
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
|
||||
|
@ -39,6 +40,7 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
server = TestUtils.createServer(KafkaConfig.fromProps(props))
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
server.shutdown()
|
||||
CoreUtils.rm(server.config.logDirs)
|
||||
|
|
|
@ -17,22 +17,21 @@
|
|||
|
||||
package kafka.server
|
||||
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import junit.framework.Assert._
|
||||
import org.junit.{After, Before, Test}
|
||||
import org.junit.Assert._
|
||||
|
||||
class DelayedOperationTest extends JUnit3Suite {
|
||||
class DelayedOperationTest {
|
||||
|
||||
var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null
|
||||
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
@Before
|
||||
def setUp() {
|
||||
purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock")
|
||||
}
|
||||
|
||||
override def tearDown() {
|
||||
@After
|
||||
def tearDown() {
|
||||
purgatory.shutdown()
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -26,9 +26,8 @@ import kafka.utils._
|
|||
import kafka.common._
|
||||
import kafka.log.LogConfig
|
||||
import kafka.admin.{AdminOperationException, AdminUtils}
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
|
||||
class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||
class DynamicConfigChangeTest extends KafkaServerTestHarness {
|
||||
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
|
||||
|
||||
@Test
|
||||
|
|
|
@ -19,7 +19,6 @@ package kafka.server
|
|||
import kafka.log._
|
||||
import java.io.File
|
||||
import org.I0Itec.zkclient.ZkClient
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.easymock.EasyMock
|
||||
import org.junit._
|
||||
import org.junit.Assert._
|
||||
|
@ -28,7 +27,7 @@ import kafka.cluster.Replica
|
|||
import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils}
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
class HighwatermarkPersistenceTest extends JUnit3Suite {
|
||||
class HighwatermarkPersistenceTest {
|
||||
|
||||
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
|
||||
val topic = "foo"
|
||||
|
|
|
@ -18,7 +18,7 @@ package kafka.server
|
|||
|
||||
import java.util.Properties
|
||||
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.{Before, After}
|
||||
import collection.mutable.HashMap
|
||||
import collection.mutable.Map
|
||||
import kafka.cluster.{Partition, Replica}
|
||||
|
@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean
|
|||
import kafka.message.MessageSet
|
||||
|
||||
|
||||
class IsrExpirationTest extends JUnit3Suite {
|
||||
class IsrExpirationTest {
|
||||
|
||||
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
|
||||
val replicaLagTimeMaxMs = 100L
|
||||
|
@ -46,14 +46,14 @@ class IsrExpirationTest extends JUnit3Suite {
|
|||
|
||||
var replicaManager: ReplicaManager = null
|
||||
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
@Before
|
||||
def setUp() {
|
||||
replicaManager = new ReplicaManager(configs.head, time, null, null, null, new AtomicBoolean(false))
|
||||
}
|
||||
|
||||
override def tearDown() {
|
||||
@After
|
||||
def tearDown() {
|
||||
replicaManager.shutdown(false)
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -26,9 +26,9 @@ import kafka.utils.{TestUtils, CoreUtils}
|
|||
import org.apache.kafka.common.config.ConfigException
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.junit.{Assert, Test}
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.scalatest.Assertions.intercept
|
||||
|
||||
class KafkaConfigTest extends JUnit3Suite {
|
||||
class KafkaConfigTest {
|
||||
|
||||
@Test
|
||||
def testLogRetentionTimeHoursProvided() {
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package kafka.server
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import kafka.api._
|
||||
import kafka.utils.{TestUtils, ZkUtils, CoreUtils}
|
||||
import kafka.cluster.Broker
|
||||
|
@ -26,9 +26,9 @@ import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrA
|
|||
import kafka.utils.TestUtils._
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.{After, Before}
|
||||
|
||||
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class LeaderElectionTest extends ZooKeeperTestHarness {
|
||||
val brokerId1 = 0
|
||||
val brokerId2 = 1
|
||||
|
||||
|
@ -36,6 +36,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
|
||||
var staleControllerEpochDetected = false
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
|
||||
|
@ -48,6 +49,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
servers ++= List(server1, server2)
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
servers.foreach(_.shutdown())
|
||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||
|
|
|
@ -19,12 +19,11 @@ package kafka.server
|
|||
|
||||
import java.io.File
|
||||
import kafka.utils._
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import java.util.{Random, Properties}
|
||||
import kafka.consumer.SimpleConsumer
|
||||
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.admin.AdminUtils
|
||||
import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
|
||||
import kafka.utils.TestUtils._
|
||||
|
@ -33,7 +32,7 @@ import org.junit.After
|
|||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
|
||||
class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class LogOffsetTest extends ZooKeeperTestHarness {
|
||||
val random = new Random()
|
||||
var logDir: File = null
|
||||
var topicLogDir: File = null
|
||||
|
|
|
@ -27,10 +27,10 @@ import kafka.serializer.StringEncoder
|
|||
|
||||
import java.io.File
|
||||
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.Assert._
|
||||
|
||||
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class LogRecoveryTest extends ZooKeeperTestHarness {
|
||||
|
||||
val replicaLagTimeMaxMs = 5000L
|
||||
val replicaLagMaxMessages = 10L
|
||||
|
@ -69,6 +69,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
keyEncoder = classOf[IntEncoder].getName)
|
||||
}
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
|
||||
|
@ -86,6 +87,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
updateProducer()
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
producer.close()
|
||||
for(server <- servers) {
|
||||
|
|
|
@ -25,7 +25,6 @@ import kafka.utils.TestUtils._
|
|||
import kafka.zk.ZooKeeperTestHarness
|
||||
|
||||
import org.junit.{After, Before, Test}
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
|
||||
import java.util.Properties
|
||||
import java.io.File
|
||||
|
@ -33,9 +32,9 @@ import java.io.File
|
|||
import scala.util.Random
|
||||
import scala.collection._
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
|
||||
class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class OffsetCommitTest extends ZooKeeperTestHarness {
|
||||
val random: Random = new Random()
|
||||
val group = "test-group"
|
||||
val retentionCheckInterval: Long = 100L
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package kafka.server
|
||||
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.{After, Before}
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import kafka.utils.TestUtils._
|
||||
import kafka.producer.KeyedMessage
|
||||
|
@ -25,11 +25,12 @@ import kafka.serializer.StringEncoder
|
|||
import kafka.utils.{TestUtils}
|
||||
import kafka.common._
|
||||
|
||||
class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class ReplicaFetchTest extends ZooKeeperTestHarness {
|
||||
var brokers: Seq[KafkaServer] = null
|
||||
val topic1 = "foo"
|
||||
val topic2 = "bar"
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
brokers = createBrokerConfigs(2, zkConnect, false)
|
||||
|
@ -37,6 +38,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
.map(config => TestUtils.createServer(config))
|
||||
}
|
||||
|
||||
@After
|
||||
override def tearDown() {
|
||||
brokers.foreach(_.shutdown())
|
||||
super.tearDown()
|
||||
|
|
|
@ -27,12 +27,11 @@ import java.io.File
|
|||
import org.apache.kafka.common.protocol.Errors
|
||||
import org.easymock.EasyMock
|
||||
import org.I0Itec.zkclient.ZkClient
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.Test
|
||||
|
||||
import scala.collection.Map
|
||||
|
||||
class ReplicaManagerTest extends JUnit3Suite {
|
||||
class ReplicaManagerTest {
|
||||
|
||||
val topic = "test-topic"
|
||||
|
||||
|
@ -84,7 +83,7 @@ class ReplicaManagerTest extends JUnit3Suite {
|
|||
|
||||
rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback)
|
||||
|
||||
rm.shutdown(false);
|
||||
rm.shutdown(false)
|
||||
|
||||
TestUtils.verifyNonDaemonThreadsStatus
|
||||
|
||||
|
|
|
@ -20,18 +20,18 @@ import java.util.Properties
|
|||
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import kafka.utils.{TestUtils, CoreUtils}
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import junit.framework.Assert._
|
||||
import org.junit.{Before, Test}
|
||||
import org.junit.Assert._
|
||||
import java.io.File
|
||||
|
||||
class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
|
||||
var props1: Properties = null
|
||||
var config1: KafkaConfig = null
|
||||
var props2: Properties = null
|
||||
var config2: KafkaConfig = null
|
||||
val brokerMetaPropsFile = "meta.properties"
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
props1 = TestUtils.createBrokerConfig(-1, zkConnect)
|
||||
|
|
|
@ -27,18 +27,18 @@ import kafka.serializer.StringEncoder
|
|||
|
||||
import java.io.File
|
||||
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import junit.framework.Assert._
|
||||
import org.junit.{Before, Test}
|
||||
import org.junit.Assert._
|
||||
|
||||
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class ServerShutdownTest extends ZooKeeperTestHarness {
|
||||
var config: KafkaConfig = null
|
||||
val host = "localhost"
|
||||
val topic = "test"
|
||||
val sent1 = List("hello", "there")
|
||||
val sent2 = List("more", "messages")
|
||||
|
||||
override def setUp(): Unit = {
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
val props = TestUtils.createBrokerConfig(0, zkConnect)
|
||||
config = KafkaConfig.fromProps(props)
|
||||
|
|
|
@ -17,15 +17,14 @@
|
|||
|
||||
package kafka.server
|
||||
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import kafka.utils.ZkUtils
|
||||
import kafka.utils.CoreUtils
|
||||
import kafka.utils.TestUtils
|
||||
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
|
||||
class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class ServerStartupTest extends ZooKeeperTestHarness {
|
||||
|
||||
def testBrokerCreatesZKChroot {
|
||||
val brokerId = 0
|
||||
|
|
|
@ -22,18 +22,17 @@ import kafka.cluster.Replica
|
|||
import kafka.common.TopicAndPartition
|
||||
import kafka.log.Log
|
||||
import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
|
||||
import org.junit.{After, Before}
|
||||
|
||||
import scala.Some
|
||||
import java.util.{Properties, Collections}
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import collection.JavaConversions._
|
||||
|
||||
import org.easymock.EasyMock
|
||||
import org.I0Itec.zkclient.ZkClient
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
|
||||
class SimpleFetchTest extends JUnit3Suite {
|
||||
class SimpleFetchTest {
|
||||
|
||||
val replicaLagTimeMaxMs = 100L
|
||||
val replicaFetchWaitMaxMs = 100
|
||||
|
@ -63,9 +62,8 @@ class SimpleFetchTest extends JUnit3Suite {
|
|||
|
||||
var replicaManager: ReplicaManager = null
|
||||
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
|
||||
@Before
|
||||
def setUp() {
|
||||
// create nice mock since we don't particularly care about zkclient calls
|
||||
val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
|
||||
EasyMock.replay(zkClient)
|
||||
|
@ -117,9 +115,9 @@ class SimpleFetchTest extends JUnit3Suite {
|
|||
partition.inSyncReplicas = allReplicas.toSet
|
||||
}
|
||||
|
||||
override def tearDown() {
|
||||
@After
|
||||
def tearDown() {
|
||||
replicaManager.shutdown(false)
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,7 +19,7 @@ package kafka.utils
|
|||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.junit.Test
|
||||
|
||||
class ByteBoundedBlockingQueueTest {
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package kafka.utils
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.junit.Test
|
||||
|
||||
class CommandLineUtilsTest {
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
package kafka.utils
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.scalatest.Assertions
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
package kafka.utils
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
class JsonTest {
|
||||
|
|
|
@ -22,13 +22,12 @@ import kafka.server.{ReplicaFetcherManager, KafkaConfig}
|
|||
import kafka.api.LeaderAndIsr
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import kafka.common.TopicAndPartition
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.Assert._
|
||||
import org.junit.Test
|
||||
import org.junit.{Before, Test}
|
||||
import org.easymock.EasyMock
|
||||
|
||||
|
||||
class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class ReplicationUtilsTest extends ZooKeeperTestHarness {
|
||||
val topic = "my-topic-test"
|
||||
val partitionId = 0
|
||||
val brokerId = 1
|
||||
|
@ -45,7 +44,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
|
||||
val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(1,leaderEpoch,List(1,2),0), controllerEpoch)
|
||||
|
||||
|
||||
@Before
|
||||
override def setUp() {
|
||||
super.setUp()
|
||||
ZkUtils.createPersistentPath(zkClient,topicPath,topicData)
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
package kafka.utils
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import java.util.concurrent.atomic._
|
||||
import org.junit.{Test, After, Before}
|
||||
import kafka.utils.TestUtils.retry
|
||||
|
|
|
@ -43,8 +43,7 @@ import kafka.admin.AdminUtils
|
|||
import kafka.producer.ProducerConfig
|
||||
import kafka.log._
|
||||
|
||||
import junit.framework.AssertionFailedError
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import org.apache.kafka.clients.producer.KafkaProducer
|
||||
|
||||
import scala.collection.Map
|
||||
|
@ -595,7 +594,7 @@ object TestUtils extends Logging {
|
|||
block
|
||||
return
|
||||
} catch {
|
||||
case e: AssertionFailedError =>
|
||||
case e: AssertionError =>
|
||||
val ellapsed = System.currentTimeMillis - startTime
|
||||
if(ellapsed > maxWaitMs) {
|
||||
throw e
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
*/
|
||||
package kafka.utils.timer
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import java.util.concurrent.atomic._
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ package kafka.utils.timer
|
|||
|
||||
import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit}
|
||||
|
||||
import junit.framework.Assert._
|
||||
import org.junit.Assert._
|
||||
import java.util.concurrent.atomic._
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
|
|
|
@ -18,13 +18,11 @@
|
|||
package kafka.zk
|
||||
|
||||
import kafka.consumer.ConsumerConfig
|
||||
import org.I0Itec.zkclient.ZkClient
|
||||
import kafka.utils.ZkUtils
|
||||
import kafka.utils.TestUtils
|
||||
import org.junit.Assert
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
|
||||
class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class ZKEphemeralTest extends ZooKeeperTestHarness {
|
||||
var zkSessionTimeoutMs = 1000
|
||||
|
||||
def testEphemeralNodeCleanup = {
|
||||
|
|
|
@ -17,13 +17,12 @@
|
|||
|
||||
package kafka.zk
|
||||
|
||||
import junit.framework.Assert
|
||||
import kafka.consumer.ConsumerConfig
|
||||
import kafka.utils.{ZkPath, TestUtils, ZkUtils}
|
||||
import org.apache.kafka.common.config.ConfigException
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.junit.Assert._
|
||||
|
||||
class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||
class ZKPathTest extends ZooKeeperTestHarness {
|
||||
|
||||
val path: String = "/some_dir"
|
||||
val zkSessionTimeoutMs = 1000
|
||||
|
@ -54,7 +53,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
case exception: Throwable => fail("Failed to create persistent path")
|
||||
}
|
||||
|
||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
||||
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
||||
}
|
||||
|
||||
def testMakeSurePersistsPathExistsThrowsException {
|
||||
|
@ -82,7 +81,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
case exception: Throwable => fail("Failed to create persistent path")
|
||||
}
|
||||
|
||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
||||
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
||||
}
|
||||
|
||||
def testCreateEphemeralPathThrowsException {
|
||||
|
@ -110,7 +109,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
case exception: Throwable => fail("Failed to create ephemeral path")
|
||||
}
|
||||
|
||||
Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
|
||||
assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
|
||||
}
|
||||
|
||||
def testCreatePersistentSequentialThrowsException {
|
||||
|
@ -140,6 +139,6 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
|||
case exception: Throwable => fail("Failed to create persistent path")
|
||||
}
|
||||
|
||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath))
|
||||
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,11 +17,12 @@
|
|||
|
||||
package kafka.zk
|
||||
|
||||
import org.scalatest.junit.JUnit3Suite
|
||||
import org.I0Itec.zkclient.ZkClient
|
||||
import kafka.utils.{ZkUtils, CoreUtils}
|
||||
import org.junit.{After, Before}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
trait ZooKeeperTestHarness extends JUnit3Suite {
|
||||
trait ZooKeeperTestHarness extends JUnitSuite {
|
||||
var zkPort: Int = -1
|
||||
var zookeeper: EmbeddedZookeeper = null
|
||||
var zkClient: ZkClient = null
|
||||
|
@ -30,17 +31,17 @@ trait ZooKeeperTestHarness extends JUnit3Suite {
|
|||
|
||||
def zkConnect: String = "127.0.0.1:" + zkPort
|
||||
|
||||
override def setUp() {
|
||||
super.setUp
|
||||
@Before
|
||||
def setUp() {
|
||||
zookeeper = new EmbeddedZookeeper()
|
||||
zkPort = zookeeper.port
|
||||
zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout)
|
||||
}
|
||||
|
||||
override def tearDown() {
|
||||
@After
|
||||
def tearDown() {
|
||||
CoreUtils.swallow(zkClient.close())
|
||||
CoreUtils.swallow(zookeeper.shutdown())
|
||||
super.tearDown
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue