mirror of https://github.com/apache/kafka.git
Merge remote-tracking branch 'origin/trunk' into copycat
This commit is contained in:
commit
c0e5fdcff5
|
@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer._
|
||||||
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
|
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
|
||||||
import org.apache.kafka.common.TopicPartition
|
import org.apache.kafka.common.TopicPartition
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
|
import org.junit.Before
|
||||||
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
|
|
||||||
|
@ -52,6 +53,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
|
||||||
.map(KafkaConfig.fromProps(_, serverConfig))
|
.map(KafkaConfig.fromProps(_, serverConfig))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ import kafka.server.KafkaConfig
|
||||||
|
|
||||||
import java.util.ArrayList
|
import java.util.ArrayList
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
|
import org.junit.Before
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import kafka.coordinator.ConsumerCoordinator
|
import kafka.coordinator.ConsumerCoordinator
|
||||||
|
@ -56,6 +57,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
||||||
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
||||||
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer
|
import org.apache.kafka.clients.producer.KafkaProducer
|
||||||
import kafka.server.{OffsetManager, KafkaConfig}
|
import kafka.server.{OffsetManager, KafkaConfig}
|
||||||
import kafka.integration.KafkaServerTestHarness
|
import kafka.integration.KafkaServerTestHarness
|
||||||
|
import org.junit.{After, Before}
|
||||||
import scala.collection.mutable.Buffer
|
import scala.collection.mutable.Buffer
|
||||||
import kafka.coordinator.ConsumerCoordinator
|
import kafka.coordinator.ConsumerCoordinator
|
||||||
|
|
||||||
|
@ -49,6 +50,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
|
||||||
cfgs.map(KafkaConfig.fromProps)
|
cfgs.map(KafkaConfig.fromProps)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
|
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
|
||||||
|
@ -70,7 +72,8 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
|
||||||
servers,
|
servers,
|
||||||
servers(0).consumerCoordinator.offsetsTopicConfigs)
|
servers(0).consumerCoordinator.offsetsTopicConfigs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
producers.foreach(_.close())
|
producers.foreach(_.close())
|
||||||
consumers.foreach(_.close())
|
consumers.foreach(_.close())
|
||||||
|
|
|
@ -22,7 +22,7 @@ import kafka.utils.{ShutdownableThread, TestUtils}
|
||||||
import org.apache.kafka.clients.producer._
|
import org.apache.kafka.clients.producer._
|
||||||
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
|
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.Test
|
import org.junit.{After, Before, Test}
|
||||||
|
|
||||||
class ProducerBounceTest extends KafkaServerTestHarness {
|
class ProducerBounceTest extends KafkaServerTestHarness {
|
||||||
private val producerBufferSize = 30000
|
private val producerBufferSize = 30000
|
||||||
|
@ -62,6 +62,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
|
||||||
private val topic1 = "topic-1"
|
private val topic1 = "topic-1"
|
||||||
private val topic2 = "topic-2"
|
private val topic2 = "topic-2"
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
|
||||||
|
@ -70,6 +71,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
|
||||||
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
|
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
if (producer1 != null) producer1.close
|
if (producer1 != null) producer1.close
|
||||||
if (producer2 != null) producer2.close
|
if (producer2 != null) producer2.close
|
||||||
|
|
|
@ -19,7 +19,6 @@ package kafka.api.test
|
||||||
|
|
||||||
import java.util.{Properties, Collection, ArrayList}
|
import java.util.{Properties, Collection, ArrayList}
|
||||||
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import org.junit.runners.Parameterized
|
import org.junit.runners.Parameterized
|
||||||
import org.junit.runner.RunWith
|
import org.junit.runner.RunWith
|
||||||
import org.junit.runners.Parameterized.Parameters
|
import org.junit.runners.Parameterized.Parameters
|
||||||
|
@ -36,7 +35,7 @@ import kafka.utils.{CoreUtils, TestUtils}
|
||||||
|
|
||||||
|
|
||||||
@RunWith(value = classOf[Parameterized])
|
@RunWith(value = classOf[Parameterized])
|
||||||
class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness {
|
class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness {
|
||||||
private val brokerId = 0
|
private val brokerId = 0
|
||||||
private var server: KafkaServer = null
|
private var server: KafkaServer = null
|
||||||
|
|
||||||
|
|
|
@ -17,22 +17,20 @@
|
||||||
|
|
||||||
package kafka.api
|
package kafka.api
|
||||||
|
|
||||||
import org.junit.Test
|
import java.util.concurrent.{ExecutionException, TimeUnit, TimeoutException}
|
||||||
import org.junit.Assert._
|
|
||||||
|
|
||||||
import java.util.{Properties, Random}
|
import java.util.{Properties, Random}
|
||||||
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
|
|
||||||
|
|
||||||
import kafka.common.Topic
|
import kafka.common.Topic
|
||||||
import kafka.consumer.SimpleConsumer
|
import kafka.consumer.SimpleConsumer
|
||||||
import kafka.integration.KafkaServerTestHarness
|
import kafka.integration.KafkaServerTestHarness
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
import kafka.utils.{ShutdownableThread, TestUtils}
|
import kafka.utils.{ShutdownableThread, TestUtils}
|
||||||
|
|
||||||
import org.apache.kafka.common.KafkaException
|
|
||||||
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException, NotEnoughReplicasAfterAppendException}
|
|
||||||
import org.apache.kafka.clients.producer._
|
import org.apache.kafka.clients.producer._
|
||||||
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
|
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
|
||||||
|
import org.apache.kafka.common.KafkaException
|
||||||
|
import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasAfterAppendException, NotEnoughReplicasException}
|
||||||
|
import org.junit.Assert._
|
||||||
|
import org.junit.{After, Before, Test}
|
||||||
|
|
||||||
class ProducerFailureHandlingTest extends KafkaServerTestHarness {
|
class ProducerFailureHandlingTest extends KafkaServerTestHarness {
|
||||||
private val producerBufferSize = 30000
|
private val producerBufferSize = 30000
|
||||||
|
@ -61,6 +59,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
|
||||||
private val topic1 = "topic-1"
|
private val topic1 = "topic-1"
|
||||||
private val topic2 = "topic-2"
|
private val topic2 = "topic-2"
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
|
||||||
|
@ -69,6 +68,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
|
||||||
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
|
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
if (producer1 != null) producer1.close
|
if (producer1 != null) producer1.close
|
||||||
if (producer2 != null) producer2.close
|
if (producer2 != null) producer2.close
|
||||||
|
|
|
@ -30,11 +30,9 @@ import org.apache.kafka.common.config.ConfigException
|
||||||
import org.apache.kafka.common.errors.SerializationException
|
import org.apache.kafka.common.errors.SerializationException
|
||||||
import org.apache.kafka.common.serialization.ByteArraySerializer
|
import org.apache.kafka.common.serialization.ByteArraySerializer
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.Test
|
import org.junit.{After, Before, Test}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
|
|
||||||
|
class ProducerSendTest extends KafkaServerTestHarness {
|
||||||
class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
|
|
||||||
val numServers = 2
|
val numServers = 2
|
||||||
|
|
||||||
val overridingProps = new Properties()
|
val overridingProps = new Properties()
|
||||||
|
@ -49,6 +47,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||||
private val topic = "topic"
|
private val topic = "topic"
|
||||||
private val numRecords = 100
|
private val numRecords = 100
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
|
||||||
|
@ -57,6 +56,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||||
consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "")
|
consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
consumer1.close()
|
consumer1.close()
|
||||||
consumer2.close()
|
consumer2.close()
|
||||||
|
|
|
@ -21,7 +21,7 @@ import java.security.Permission
|
||||||
|
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
import org.junit.{After, Before, Test}
|
import org.junit.{After, Before, Test}
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
|
|
||||||
class KafkaTest {
|
class KafkaTest {
|
||||||
|
|
||||||
|
|
|
@ -17,17 +17,17 @@
|
||||||
|
|
||||||
package kafka.admin
|
package kafka.admin
|
||||||
|
|
||||||
|
import org.junit.Assert._
|
||||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import junit.framework.Assert._
|
|
||||||
import kafka.utils.{ZkUtils, CoreUtils, TestUtils}
|
import kafka.utils.{ZkUtils, CoreUtils, TestUtils}
|
||||||
import kafka.cluster.Broker
|
import kafka.cluster.Broker
|
||||||
import kafka.client.ClientUtils
|
import kafka.client.ClientUtils
|
||||||
import kafka.server.{KafkaConfig, KafkaServer}
|
import kafka.server.{KafkaConfig, KafkaServer}
|
||||||
|
import org.junit.{After, Before}
|
||||||
|
|
||||||
class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class AddPartitionsTest extends ZooKeeperTestHarness {
|
||||||
var configs: Seq[KafkaConfig] = null
|
var configs: Seq[KafkaConfig] = null
|
||||||
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
|
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
|
||||||
var brokers: Seq[Broker] = Seq.empty[Broker]
|
var brokers: Seq[Broker] = Seq.empty[Broker]
|
||||||
|
@ -39,6 +39,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
val topic3 = "new-topic3"
|
val topic3 = "new-topic3"
|
||||||
val topic4 = "new-topic4"
|
val topic4 = "new-topic4"
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
|
||||||
|
@ -54,6 +55,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
|
createTopic(zkClient, topic4, partitionReplicaAssignment = Map(0->Seq(0,3)), servers = servers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
servers.foreach(_.shutdown())
|
servers.foreach(_.shutdown())
|
||||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||||
|
|
|
@ -18,7 +18,6 @@ package kafka.admin
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import junit.framework.Assert._
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import kafka.log._
|
import kafka.log._
|
||||||
|
@ -30,7 +29,7 @@ import java.io.File
|
||||||
import TestUtils._
|
import TestUtils._
|
||||||
|
|
||||||
|
|
||||||
class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
|
class AdminTest extends ZooKeeperTestHarness with Logging {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testReplicaAssignment() {
|
def testReplicaAssignment() {
|
||||||
|
|
|
@ -19,15 +19,10 @@ package kafka.admin
|
||||||
import junit.framework.Assert._
|
import junit.framework.Assert._
|
||||||
import kafka.admin.ConfigCommand.ConfigCommandOptions
|
import kafka.admin.ConfigCommand.ConfigCommandOptions
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.utils.Logging
|
import kafka.utils.Logging
|
||||||
import kafka.utils.TestUtils
|
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
|
|
||||||
import kafka.admin.TopicCommand.TopicCommandOptions
|
|
||||||
import kafka.utils.ZkUtils
|
|
||||||
|
|
||||||
class ConfigCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
|
class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
|
||||||
@Test
|
@Test
|
||||||
def testArgumentParse() {
|
def testArgumentParse() {
|
||||||
// Should parse correctly
|
// Should parse correctly
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package kafka.admin
|
package kafka.admin
|
||||||
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
|
@ -25,7 +24,7 @@ import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
|
||||||
import kafka.integration.KafkaServerTestHarness
|
import kafka.integration.KafkaServerTestHarness
|
||||||
|
|
||||||
|
|
||||||
class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness {
|
class DeleteConsumerGroupTest extends KafkaServerTestHarness {
|
||||||
def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
|
def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
package kafka.admin
|
package kafka.admin
|
||||||
|
|
||||||
import kafka.log.Log
|
import kafka.log.Log
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import junit.framework.Assert._
|
import junit.framework.Assert._
|
||||||
import kafka.utils.{ZkUtils, TestUtils}
|
import kafka.utils.{ZkUtils, TestUtils}
|
||||||
|
@ -26,7 +25,7 @@ import org.junit.Test
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
|
import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
|
||||||
|
|
||||||
class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class DeleteTopicTest extends ZooKeeperTestHarness {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testDeleteTopicWithAllAliveReplicas() {
|
def testDeleteTopicWithAllAliveReplicas() {
|
||||||
|
|
|
@ -18,16 +18,15 @@ package kafka.admin
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import junit.framework.Assert._
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.utils.Logging
|
import kafka.utils.Logging
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.server.{ConfigType, OffsetManager, KafkaConfig}
|
import kafka.server.ConfigType
|
||||||
import kafka.admin.TopicCommand.TopicCommandOptions
|
import kafka.admin.TopicCommand.TopicCommandOptions
|
||||||
import kafka.utils.ZkUtils
|
import kafka.utils.ZkUtils
|
||||||
import kafka.coordinator.ConsumerCoordinator
|
import kafka.coordinator.ConsumerCoordinator
|
||||||
|
|
||||||
class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
|
class TopicCommandTest extends ZooKeeperTestHarness with Logging {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testConfigPreservationAcrossPartitionAlteration() {
|
def testConfigPreservationAcrossPartitionAlteration() {
|
||||||
|
|
|
@ -19,7 +19,7 @@ package kafka.api
|
||||||
|
|
||||||
import org.junit._
|
import org.junit._
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import kafka.common.KafkaException
|
import kafka.common.KafkaException
|
||||||
|
|
|
@ -32,7 +32,7 @@ import java.nio.ByteBuffer
|
||||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||||
import org.junit._
|
import org.junit._
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
|
|
||||||
|
|
||||||
object SerializationTestUtils {
|
object SerializationTestUtils {
|
||||||
|
|
|
@ -22,11 +22,10 @@ import java.nio.ByteBuffer
|
||||||
import kafka.utils.Logging
|
import kafka.utils.Logging
|
||||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
|
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
|
|
||||||
class BrokerEndPointTest extends JUnit3Suite with Logging {
|
class BrokerEndPointTest extends Logging {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testSerDe() = {
|
def testSerDe() = {
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package kafka.common
|
package kafka.common
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import collection.mutable.ArrayBuffer
|
import collection.mutable.ArrayBuffer
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import kafka.producer.ProducerConfig
|
import kafka.producer.ProducerConfig
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package kafka.common
|
package kafka.common
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import collection.mutable.ArrayBuffer
|
import collection.mutable.ArrayBuffer
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
|
|
||||||
|
|
|
@ -18,22 +18,20 @@
|
||||||
|
|
||||||
package kafka.consumer
|
package kafka.consumer
|
||||||
|
|
||||||
import java.util.Properties
|
|
||||||
import java.util.concurrent._
|
import java.util.concurrent._
|
||||||
import java.util.concurrent.atomic._
|
import java.util.concurrent.atomic._
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
|
|
||||||
import kafka.message._
|
import kafka.message._
|
||||||
import kafka.server._
|
import kafka.server._
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import org.junit.Test
|
import org.junit.{Before, Test}
|
||||||
import kafka.serializer._
|
import kafka.serializer._
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.integration.KafkaServerTestHarness
|
import kafka.integration.KafkaServerTestHarness
|
||||||
|
|
||||||
class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
|
class ConsumerIteratorTest extends KafkaServerTestHarness {
|
||||||
|
|
||||||
val numNodes = 1
|
val numNodes = 1
|
||||||
|
|
||||||
|
@ -49,6 +47,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||||
|
|
||||||
def consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
|
def consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
topicInfos = configs.map(c => new PartitionTopicInfo(topic,
|
topicInfos = configs.map(c => new PartitionTopicInfo(topic,
|
||||||
|
|
|
@ -17,18 +17,17 @@
|
||||||
|
|
||||||
package kafka.consumer
|
package kafka.consumer
|
||||||
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import org.easymock.EasyMock
|
import org.easymock.EasyMock
|
||||||
import org.I0Itec.zkclient.ZkClient
|
import org.I0Itec.zkclient.ZkClient
|
||||||
import org.apache.zookeeper.data.Stat
|
import org.apache.zookeeper.data.Stat
|
||||||
import kafka.utils.{TestUtils, Logging, ZkUtils, Json}
|
import kafka.utils.{TestUtils, Logging, ZkUtils, Json}
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import kafka.common.TopicAndPartition
|
import kafka.common.TopicAndPartition
|
||||||
import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
|
import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
|
||||||
import kafka.consumer.PartitionAssignorTest.Scenario
|
import kafka.consumer.PartitionAssignorTest.Scenario
|
||||||
import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
|
import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
|
||||||
|
|
||||||
class PartitionAssignorTest extends JUnit3Suite with Logging {
|
class PartitionAssignorTest extends Logging {
|
||||||
|
|
||||||
def testRoundRobinPartitionAssignor() {
|
def testRoundRobinPartitionAssignor() {
|
||||||
val assignor = new RoundRobinAssignor
|
val assignor = new RoundRobinAssignor
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
package kafka.consumer
|
package kafka.consumer
|
||||||
|
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import kafka.server.OffsetManager
|
import kafka.server.OffsetManager
|
||||||
|
|
|
@ -19,7 +19,7 @@ package kafka.consumer
|
||||||
|
|
||||||
import java.util.{Collections, Properties}
|
import java.util.{Collections, Properties}
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import kafka.common.MessageStreamsExistException
|
import kafka.common.MessageStreamsExistException
|
||||||
import kafka.integration.KafkaServerTestHarness
|
import kafka.integration.KafkaServerTestHarness
|
||||||
import kafka.javaapi.consumer.ConsumerRebalanceListener
|
import kafka.javaapi.consumer.ConsumerRebalanceListener
|
||||||
|
@ -30,11 +30,11 @@ import kafka.utils.TestUtils._
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import org.I0Itec.zkclient.ZkClient
|
import org.I0Itec.zkclient.ZkClient
|
||||||
import org.apache.log4j.{Level, Logger}
|
import org.apache.log4j.{Level, Logger}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.{After, Before}
|
||||||
|
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
|
|
||||||
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
|
class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging {
|
||||||
|
|
||||||
val RebalanceBackoffMs = 5000
|
val RebalanceBackoffMs = 5000
|
||||||
var dirs : ZKGroupTopicDirs = null
|
var dirs : ZKGroupTopicDirs = null
|
||||||
|
@ -54,11 +54,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
|
||||||
val consumer3 = "consumer3"
|
val consumer3 = "consumer3"
|
||||||
val nMessages = 2
|
val nMessages = 2
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
dirs = new ZKGroupTopicDirs(group, topic)
|
dirs = new ZKGroupTopicDirs(group, topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
super.tearDown()
|
super.tearDown()
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,10 +20,10 @@ package kafka.coordinator
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import kafka.common.TopicAndPartition
|
import kafka.common.TopicAndPartition
|
||||||
import kafka.server.{OffsetManager, ReplicaManager, KafkaConfig}
|
import kafka.server.{OffsetManager, KafkaConfig}
|
||||||
import kafka.utils.{KafkaScheduler, TestUtils}
|
import kafka.utils.TestUtils
|
||||||
import org.apache.kafka.common.protocol.Errors
|
import org.apache.kafka.common.protocol.Errors
|
||||||
import org.apache.kafka.common.requests.JoinGroupRequest
|
import org.apache.kafka.common.requests.JoinGroupRequest
|
||||||
import org.easymock.EasyMock
|
import org.easymock.EasyMock
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package kafka.coordinator
|
package kafka.coordinator
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.{Before, Test}
|
import org.junit.{Before, Test}
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ package kafka.coordinator
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
import kafka.utils.{ZkUtils, TestUtils}
|
import kafka.utils.{ZkUtils, TestUtils}
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
|
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
|
||||||
import org.apache.zookeeper.data.Stat
|
import org.apache.zookeeper.data.Stat
|
||||||
import org.easymock.EasyMock
|
import org.easymock.EasyMock
|
||||||
|
|
|
@ -19,7 +19,7 @@ package kafka.coordinator
|
||||||
|
|
||||||
import kafka.common.TopicAndPartition
|
import kafka.common.TopicAndPartition
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
|
|
||||||
|
|
|
@ -24,12 +24,11 @@ import kafka.utils.TestUtils
|
||||||
import kafka.serializer._
|
import kafka.serializer._
|
||||||
import kafka.producer.{Producer, KeyedMessage}
|
import kafka.producer.{Producer, KeyedMessage}
|
||||||
|
|
||||||
import org.junit.Test
|
import org.junit.{After, Before, Test}
|
||||||
import org.apache.log4j.{Level, Logger}
|
import org.apache.log4j.{Level, Logger}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.Assert._
|
||||||
import junit.framework.Assert._
|
|
||||||
|
|
||||||
class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
|
class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
|
||||||
|
|
||||||
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
|
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
|
||||||
|
|
||||||
|
@ -42,12 +41,14 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
|
||||||
|
|
||||||
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
|
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
// temporarily set request handler logger to a higher level
|
// temporarily set request handler logger to a higher level
|
||||||
requestHandlerLogger.setLevel(Level.FATAL)
|
requestHandlerLogger.setLevel(Level.FATAL)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
// restore set request handler logger to a higher level
|
// restore set request handler logger to a higher level
|
||||||
requestHandlerLogger.setLevel(Level.ERROR)
|
requestHandlerLogger.setLevel(Level.ERROR)
|
||||||
|
|
|
@ -19,18 +19,17 @@ package kafka.integration
|
||||||
|
|
||||||
import java.util.concurrent._
|
import java.util.concurrent._
|
||||||
import java.util.concurrent.atomic._
|
import java.util.concurrent.atomic._
|
||||||
|
import org.junit.{After, Before}
|
||||||
|
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
|
|
||||||
import kafka.cluster._
|
import kafka.cluster._
|
||||||
import kafka.server._
|
import kafka.server._
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.consumer._
|
import kafka.consumer._
|
||||||
import kafka.serializer._
|
|
||||||
import kafka.producer.{KeyedMessage, Producer}
|
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
|
|
||||||
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
|
class FetcherTest extends KafkaServerTestHarness {
|
||||||
val numNodes = 1
|
val numNodes = 1
|
||||||
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
|
def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)
|
||||||
|
|
||||||
|
@ -40,6 +39,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||||
|
|
||||||
var fetcher: ConsumerFetcherManager = null
|
var fetcher: ConsumerFetcherManager = null
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp
|
super.setUp
|
||||||
TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
|
TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
|
||||||
|
@ -59,6 +59,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
|
||||||
fetcher.startConnections(topicInfos, cluster)
|
fetcher.startConnections(topicInfos, cluster)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
fetcher.stopConnections()
|
fetcher.stopConnections()
|
||||||
super.tearDown
|
super.tearDown
|
||||||
|
|
|
@ -19,17 +19,18 @@ package kafka.integration
|
||||||
|
|
||||||
import java.util.Arrays
|
import java.util.Arrays
|
||||||
|
|
||||||
import scala.collection.mutable.Buffer
|
import kafka.common.KafkaException
|
||||||
import kafka.server._
|
import kafka.server._
|
||||||
import kafka.utils.{CoreUtils, TestUtils}
|
import kafka.utils.{CoreUtils, TestUtils}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.common.KafkaException
|
import org.junit.{After, Before}
|
||||||
|
|
||||||
|
import scala.collection.mutable.Buffer
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A test harness that brings up some number of broker nodes
|
* A test harness that brings up some number of broker nodes
|
||||||
*/
|
*/
|
||||||
trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
|
trait KafkaServerTestHarness extends ZooKeeperTestHarness {
|
||||||
var instanceConfigs: Seq[KafkaConfig] = null
|
var instanceConfigs: Seq[KafkaConfig] = null
|
||||||
var servers: Buffer[KafkaServer] = null
|
var servers: Buffer[KafkaServer] = null
|
||||||
var brokerList: String = null
|
var brokerList: String = null
|
||||||
|
@ -51,7 +52,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
|
|
||||||
def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",")
|
def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",")
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp
|
super.setUp
|
||||||
if(configs.size <= 0)
|
if(configs.size <= 0)
|
||||||
|
@ -62,6 +63,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
Arrays.fill(alive, true)
|
Arrays.fill(alive, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
servers.foreach(_.shutdown())
|
servers.foreach(_.shutdown())
|
||||||
servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_)))
|
servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_)))
|
||||||
|
|
|
@ -21,9 +21,8 @@ import java.util.Properties
|
||||||
|
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
|
|
||||||
class MinIsrConfigTest extends JUnit3Suite with KafkaServerTestHarness {
|
class MinIsrConfigTest extends KafkaServerTestHarness {
|
||||||
|
|
||||||
val overridingProps = new Properties()
|
val overridingProps = new Properties()
|
||||||
overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5")
|
overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5")
|
||||||
|
|
|
@ -18,13 +18,12 @@
|
||||||
package kafka.integration
|
package kafka.integration
|
||||||
|
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
|
import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
|
||||||
import kafka.server.{KafkaRequestHandler, KafkaConfig}
|
import kafka.server.{KafkaRequestHandler, KafkaConfig}
|
||||||
import kafka.producer.{KeyedMessage, Producer}
|
import kafka.producer.{KeyedMessage, Producer}
|
||||||
import org.apache.log4j.{Level, Logger}
|
import org.apache.log4j.{Level, Logger}
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
|
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
|
||||||
import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils}
|
import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils}
|
||||||
|
@ -34,7 +33,7 @@ import java.util.Properties
|
||||||
/**
|
/**
|
||||||
* End to end tests of the primitive apis against a local server
|
* End to end tests of the primitive apis against a local server
|
||||||
*/
|
*/
|
||||||
class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
|
class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHarness {
|
||||||
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
|
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
|
||||||
|
|
||||||
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
|
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
|
||||||
|
|
|
@ -5,8 +5,8 @@
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
* (the "License"); you may not use this file except in compliance with
|
* (the "License"); you may not use this file except in compliance with
|
||||||
* the License. You may obtain a copy of the License at
|
* the License. You may obtain a copy of the License at
|
||||||
*
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
@ -18,28 +18,30 @@
|
||||||
package kafka.integration
|
package kafka.integration
|
||||||
|
|
||||||
import kafka.consumer.SimpleConsumer
|
import kafka.consumer.SimpleConsumer
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.{After, Before}
|
||||||
import kafka.producer.Producer
|
import kafka.producer.Producer
|
||||||
import kafka.utils.{StaticPartitioner, TestUtils}
|
import kafka.utils.{StaticPartitioner, TestUtils}
|
||||||
import kafka.serializer.StringEncoder
|
import kafka.serializer.StringEncoder
|
||||||
|
|
||||||
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
|
trait ProducerConsumerTestHarness extends KafkaServerTestHarness {
|
||||||
val host = "localhost"
|
val host = "localhost"
|
||||||
var producer: Producer[String, String] = null
|
var producer: Producer[String, String] = null
|
||||||
var consumer: SimpleConsumer = null
|
var consumer: SimpleConsumer = null
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp
|
super.setUp
|
||||||
producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers),
|
producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers),
|
||||||
encoder = classOf[StringEncoder].getName,
|
encoder = classOf[StringEncoder].getName,
|
||||||
keyEncoder = classOf[StringEncoder].getName,
|
keyEncoder = classOf[StringEncoder].getName,
|
||||||
partitioner = classOf[StaticPartitioner].getName)
|
partitioner = classOf[StaticPartitioner].getName)
|
||||||
consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64*1024, "")
|
consumer = new SimpleConsumer(host, servers(0).boundPort(), 1000000, 64 * 1024, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
override def tearDown() {
|
@After
|
||||||
producer.close()
|
override def tearDown() {
|
||||||
consumer.close()
|
producer.close()
|
||||||
super.tearDown
|
consumer.close()
|
||||||
}
|
super.tearDown
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,18 +17,19 @@
|
||||||
|
|
||||||
package kafka.integration
|
package kafka.integration
|
||||||
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.{After, Before}
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import kafka.utils.{CoreUtils, TestUtils}
|
import kafka.utils.{CoreUtils, TestUtils}
|
||||||
import kafka.server.{KafkaConfig, KafkaServer}
|
import kafka.server.{KafkaConfig, KafkaServer}
|
||||||
|
|
||||||
class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class RollingBounceTest extends ZooKeeperTestHarness {
|
||||||
|
|
||||||
val partitionId = 0
|
val partitionId = 0
|
||||||
var servers: Seq[KafkaServer] = null
|
var servers: Seq[KafkaServer] = null
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
// controlled.shutdown.enable is true by default
|
// controlled.shutdown.enable is true by default
|
||||||
|
@ -39,6 +40,7 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
servers = configs.map(c => TestUtils.createServer(KafkaConfig.fromProps(c)))
|
servers = configs.map(c => TestUtils.createServer(KafkaConfig.fromProps(c)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
servers.foreach(_.shutdown())
|
servers.foreach(_.shutdown())
|
||||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||||
|
|
|
@ -19,9 +19,8 @@ package kafka.integration
|
||||||
|
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
|
|
||||||
import junit.framework.Assert._
|
|
||||||
import kafka.admin.AdminUtils
|
import kafka.admin.AdminUtils
|
||||||
import kafka.api.{TopicMetadataResponse, TopicMetadataRequest}
|
import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
|
||||||
import kafka.client.ClientUtils
|
import kafka.client.ClientUtils
|
||||||
import kafka.cluster.{Broker, BrokerEndPoint}
|
import kafka.cluster.{Broker, BrokerEndPoint}
|
||||||
import kafka.common.ErrorMapping
|
import kafka.common.ErrorMapping
|
||||||
|
@ -30,14 +29,16 @@ import kafka.utils.TestUtils
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.Assert._
|
||||||
|
import org.junit.{After, Before}
|
||||||
|
|
||||||
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class TopicMetadataTest extends ZooKeeperTestHarness {
|
||||||
private var server1: KafkaServer = null
|
private var server1: KafkaServer = null
|
||||||
var brokerEndPoints: Seq[BrokerEndPoint] = null
|
var brokerEndPoints: Seq[BrokerEndPoint] = null
|
||||||
var adHocConfigs: Seq[KafkaConfig] = null
|
var adHocConfigs: Seq[KafkaConfig] = null
|
||||||
val numConfigs: Int = 4
|
val numConfigs: Int = 4
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
val props = createBrokerConfigs(numConfigs, zkConnect)
|
val props = createBrokerConfigs(numConfigs, zkConnect)
|
||||||
|
@ -47,6 +48,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
|
brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
server1.shutdown()
|
server1.shutdown()
|
||||||
super.tearDown()
|
super.tearDown()
|
||||||
|
|
|
@ -18,24 +18,22 @@
|
||||||
package kafka.integration
|
package kafka.integration
|
||||||
|
|
||||||
import org.apache.kafka.common.config.ConfigException
|
import org.apache.kafka.common.config.ConfigException
|
||||||
|
import org.junit.{After, Before}
|
||||||
|
|
||||||
import scala.collection.mutable.MutableList
|
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
import org.apache.log4j.{Level, Logger}
|
import org.apache.log4j.{Level, Logger}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import junit.framework.Assert._
|
|
||||||
import kafka.admin.AdminUtils
|
import kafka.admin.AdminUtils
|
||||||
import kafka.common.FailedToSendMessageException
|
import kafka.common.FailedToSendMessageException
|
||||||
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException}
|
import kafka.consumer.{Consumer, ConsumerConfig}
|
||||||
import kafka.producer.{KeyedMessage, Producer}
|
|
||||||
import kafka.serializer.StringDecoder
|
import kafka.serializer.StringDecoder
|
||||||
import kafka.server.{KafkaConfig, KafkaServer}
|
import kafka.server.{KafkaConfig, KafkaServer}
|
||||||
import kafka.utils.CoreUtils
|
import kafka.utils.CoreUtils
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
|
import org.junit.Assert._
|
||||||
|
|
||||||
class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
||||||
val brokerId1 = 0
|
val brokerId1 = 0
|
||||||
val brokerId2 = 1
|
val brokerId2 = 1
|
||||||
|
|
||||||
|
@ -58,6 +56,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer])
|
val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer])
|
||||||
val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]])
|
val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]])
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
|
||||||
|
@ -77,6 +76,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
eventHandlerLogger.setLevel(Level.FATAL)
|
eventHandlerLogger.setLevel(Level.FATAL)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
servers.foreach(server => shutdownServer(server))
|
servers.foreach(server => shutdownServer(server))
|
||||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||||
|
|
|
@ -20,7 +20,6 @@ package kafka.javaapi.consumer
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import kafka.server._
|
import kafka.server._
|
||||||
import kafka.message._
|
|
||||||
import kafka.serializer._
|
import kafka.serializer._
|
||||||
import kafka.integration.KafkaServerTestHarness
|
import kafka.integration.KafkaServerTestHarness
|
||||||
import kafka.producer.KeyedMessage
|
import kafka.producer.KeyedMessage
|
||||||
|
@ -33,12 +32,11 @@ import kafka.common.MessageStreamsExistException
|
||||||
|
|
||||||
import scala.collection.JavaConversions
|
import scala.collection.JavaConversions
|
||||||
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import org.apache.log4j.{Level, Logger}
|
import org.apache.log4j.{Level, Logger}
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
|
|
||||||
|
|
||||||
class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
|
class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
|
||||||
val numNodes = 2
|
val numNodes = 2
|
||||||
val numParts = 2
|
val numParts = 2
|
||||||
val topic = "topic1"
|
val topic = "topic1"
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package kafka.javaapi.message
|
package kafka.javaapi.message
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package kafka.javaapi.message
|
package kafka.javaapi.message
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
|
import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
|
||||||
|
|
||||||
|
|
|
@ -17,19 +17,20 @@
|
||||||
|
|
||||||
package kafka.log
|
package kafka.log
|
||||||
|
|
||||||
import java.util.Properties
|
|
||||||
|
|
||||||
import junit.framework.Assert._
|
|
||||||
import org.scalatest.junit.JUnitSuite
|
|
||||||
import org.junit.{After, Test}
|
|
||||||
import java.nio._
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import scala.collection._
|
import java.nio._
|
||||||
import kafka.common._
|
import java.util.Properties
|
||||||
import kafka.utils._
|
|
||||||
import kafka.message._
|
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
|
||||||
|
import kafka.common._
|
||||||
|
import kafka.message._
|
||||||
|
import kafka.utils._
|
||||||
import org.apache.kafka.common.utils.Utils
|
import org.apache.kafka.common.utils.Utils
|
||||||
|
import org.junit.Assert._
|
||||||
|
import org.junit.{After, Test}
|
||||||
|
import org.scalatest.junit.JUnitSuite
|
||||||
|
|
||||||
|
import scala.collection._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for the log cleaning logic
|
* Unit tests for the log cleaning logic
|
||||||
|
|
|
@ -20,7 +20,7 @@ package kafka.log
|
||||||
import java.io._
|
import java.io._
|
||||||
import java.nio._
|
import java.nio._
|
||||||
import java.util.concurrent.atomic._
|
import java.util.concurrent.atomic._
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import kafka.message._
|
import kafka.message._
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
|
|
|
@ -30,16 +30,14 @@ import org.junit._
|
||||||
import org.junit.runner.RunWith
|
import org.junit.runner.RunWith
|
||||||
import org.junit.runners.Parameterized
|
import org.junit.runners.Parameterized
|
||||||
import org.junit.runners.Parameterized.Parameters
|
import org.junit.runners.Parameterized.Parameters
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
|
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is an integration test that tests the fully integrated log cleaner
|
* This is an integration test that tests the fully integrated log cleaner
|
||||||
*/
|
*/
|
||||||
@RunWith(value = classOf[Parameterized])
|
@RunWith(value = classOf[Parameterized])
|
||||||
class LogCleanerIntegrationTest(compressionCodec: String) extends JUnit3Suite {
|
class LogCleanerIntegrationTest(compressionCodec: String) {
|
||||||
|
|
||||||
val time = new MockTime()
|
val time = new MockTime()
|
||||||
val segmentSize = 100
|
val segmentSize = 100
|
||||||
|
|
|
@ -21,9 +21,9 @@ import java.util.Properties
|
||||||
|
|
||||||
import org.apache.kafka.common.config.ConfigException
|
import org.apache.kafka.common.config.ConfigException
|
||||||
import org.junit.{Assert, Test}
|
import org.junit.{Assert, Test}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.scalatest.Assertions._
|
||||||
|
|
||||||
class LogConfigTest extends JUnit3Suite {
|
class LogConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testFromPropsEmpty() {
|
def testFromPropsEmpty() {
|
||||||
|
|
|
@ -19,14 +19,14 @@ package kafka.log
|
||||||
|
|
||||||
import java.io._
|
import java.io._
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import junit.framework.Assert._
|
|
||||||
import org.junit.Test
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.server.{BrokerState, OffsetCheckpoint}
|
|
||||||
import kafka.common._
|
|
||||||
import kafka.utils._
|
|
||||||
|
|
||||||
class LogManagerTest extends JUnit3Suite {
|
import kafka.common._
|
||||||
|
import kafka.server.OffsetCheckpoint
|
||||||
|
import kafka.utils._
|
||||||
|
import org.junit.Assert._
|
||||||
|
import org.junit.{After, Before, Test}
|
||||||
|
|
||||||
|
class LogManagerTest {
|
||||||
|
|
||||||
val time: MockTime = new MockTime()
|
val time: MockTime = new MockTime()
|
||||||
val maxRollInterval = 100
|
val maxRollInterval = 100
|
||||||
|
@ -41,20 +41,20 @@ class LogManagerTest extends JUnit3Suite {
|
||||||
val name = "kafka"
|
val name = "kafka"
|
||||||
val veryLargeLogFlushInterval = 10000000L
|
val veryLargeLogFlushInterval = 10000000L
|
||||||
|
|
||||||
override def setUp() {
|
@Before
|
||||||
super.setUp()
|
def setUp() {
|
||||||
logDir = TestUtils.tempDir()
|
logDir = TestUtils.tempDir()
|
||||||
logManager = createLogManager()
|
logManager = createLogManager()
|
||||||
logManager.startup
|
logManager.startup
|
||||||
logDir = logManager.logDirs(0)
|
logDir = logManager.logDirs(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def tearDown() {
|
@After
|
||||||
|
def tearDown() {
|
||||||
if(logManager != null)
|
if(logManager != null)
|
||||||
logManager.shutdown()
|
logManager.shutdown()
|
||||||
CoreUtils.rm(logDir)
|
CoreUtils.rm(logDir)
|
||||||
logManager.logDirs.foreach(CoreUtils.rm(_))
|
logManager.logDirs.foreach(CoreUtils.rm(_))
|
||||||
super.tearDown()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -16,19 +16,15 @@
|
||||||
*/
|
*/
|
||||||
package kafka.log
|
package kafka.log
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import java.util.concurrent.atomic._
|
import java.util.concurrent.atomic._
|
||||||
import java.io.File
|
|
||||||
import java.io.RandomAccessFile
|
|
||||||
import java.util.Random
|
|
||||||
import org.junit.{Test, After}
|
import org.junit.{Test, After}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
import kafka.message._
|
import kafka.message._
|
||||||
import kafka.utils.SystemTime
|
import kafka.utils.SystemTime
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
|
|
||||||
class LogSegmentTest extends JUnit3Suite {
|
class LogSegmentTest {
|
||||||
|
|
||||||
val segments = mutable.ArrayBuffer[LogSegment]()
|
val segments = mutable.ArrayBuffer[LogSegment]()
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ package kafka.log
|
||||||
import java.io._
|
import java.io._
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import java.util.concurrent.atomic._
|
import java.util.concurrent.atomic._
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
import org.junit.{After, Before, Test}
|
import org.junit.{After, Before, Test}
|
||||||
import kafka.message._
|
import kafka.message._
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
package kafka.log
|
package kafka.log
|
||||||
|
|
||||||
import java.io._
|
import java.io._
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import java.util.{Collections, Arrays}
|
import java.util.{Collections, Arrays}
|
||||||
import org.junit._
|
import org.junit._
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
|
|
|
@ -20,7 +20,7 @@ package kafka.log
|
||||||
import java.nio._
|
import java.nio._
|
||||||
import org.junit._
|
import org.junit._
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
|
|
||||||
class OffsetMapTest extends JUnitSuite {
|
class OffsetMapTest extends JUnitSuite {
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
package kafka.message
|
package kafka.message
|
||||||
|
|
||||||
import java.io.RandomAccessFile
|
import java.io.RandomAccessFile
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import kafka.log.FileMessageSet
|
import kafka.log.FileMessageSet
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
|
|
|
@ -19,7 +19,7 @@ package kafka.message
|
||||||
|
|
||||||
import java.nio._
|
import java.nio._
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ import java.io.ByteArrayOutputStream
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
import org.junit._
|
import org.junit._
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
|
|
||||||
class MessageCompressionTest extends JUnitSuite {
|
class MessageCompressionTest extends JUnitSuite {
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ package kafka.message
|
||||||
import java.nio._
|
import java.nio._
|
||||||
import java.util.HashMap
|
import java.util.HashMap
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
import org.junit.{Before, Test}
|
import org.junit.{Before, Test}
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
|
|
|
@ -20,7 +20,7 @@ package kafka.message
|
||||||
import java.io.{InputStream, ByteArrayInputStream, ByteArrayOutputStream}
|
import java.io.{InputStream, ByteArrayInputStream, ByteArrayOutputStream}
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.util.Random
|
import java.util.Random
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
|
|
||||||
|
|
|
@ -18,12 +18,11 @@ package kafka.metrics
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import com.yammer.metrics.core.{MetricsRegistry, Clock}
|
import com.yammer.metrics.core.{MetricsRegistry, Clock}
|
||||||
|
|
||||||
class KafkaTimerTest extends JUnit3Suite {
|
class KafkaTimerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testKafkaTimer() {
|
def testKafkaTimer() {
|
||||||
|
|
|
@ -21,11 +21,10 @@ import java.util.Properties
|
||||||
|
|
||||||
import com.yammer.metrics.Metrics
|
import com.yammer.metrics.Metrics
|
||||||
import com.yammer.metrics.core.MetricPredicate
|
import com.yammer.metrics.core.MetricPredicate
|
||||||
import org.junit.Test
|
import org.junit.{After, Test}
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import kafka.integration.KafkaServerTestHarness
|
import kafka.integration.KafkaServerTestHarness
|
||||||
import kafka.server._
|
import kafka.server._
|
||||||
import kafka.message._
|
|
||||||
import kafka.serializer._
|
import kafka.serializer._
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import kafka.admin.AdminUtils
|
import kafka.admin.AdminUtils
|
||||||
|
@ -33,9 +32,8 @@ import kafka.utils.TestUtils._
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
import scala.util.matching.Regex
|
import scala.util.matching.Regex
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
|
|
||||||
class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
|
class MetricsTest extends KafkaServerTestHarness with Logging {
|
||||||
val numNodes = 2
|
val numNodes = 2
|
||||||
val numParts = 2
|
val numParts = 2
|
||||||
val topic = "topic1"
|
val topic = "topic1"
|
||||||
|
@ -48,6 +46,7 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
|
||||||
|
|
||||||
val nMessages = 2
|
val nMessages = 2
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
super.tearDown()
|
super.tearDown()
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,27 +17,26 @@
|
||||||
|
|
||||||
package kafka.network;
|
package kafka.network;
|
||||||
|
|
||||||
import java.net._
|
|
||||||
import java.io._
|
import java.io._
|
||||||
|
import java.net._
|
||||||
|
import java.nio.ByteBuffer
|
||||||
|
import java.util.Random
|
||||||
|
|
||||||
|
import kafka.api.ProducerRequest
|
||||||
import kafka.cluster.EndPoint
|
import kafka.cluster.EndPoint
|
||||||
|
import kafka.common.TopicAndPartition
|
||||||
|
import kafka.message.ByteBufferMessageSet
|
||||||
|
import kafka.producer.SyncProducerConfig
|
||||||
import org.apache.kafka.common.metrics.Metrics
|
import org.apache.kafka.common.metrics.Metrics
|
||||||
import org.apache.kafka.common.network.NetworkSend
|
import org.apache.kafka.common.network.NetworkSend
|
||||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||||
import org.apache.kafka.common.utils.SystemTime
|
import org.apache.kafka.common.utils.SystemTime
|
||||||
|
import org.junit.Assert._
|
||||||
import org.junit._
|
import org.junit._
|
||||||
import org.scalatest.junit.JUnitSuite
|
|
||||||
import java.util.Random
|
|
||||||
import junit.framework.Assert._
|
|
||||||
import kafka.producer.SyncProducerConfig
|
|
||||||
import kafka.api.ProducerRequest
|
|
||||||
import java.nio.ByteBuffer
|
|
||||||
import kafka.common.TopicAndPartition
|
|
||||||
import kafka.message.ByteBufferMessageSet
|
|
||||||
import java.nio.channels.SelectionKey
|
|
||||||
import kafka.utils.TestUtils
|
|
||||||
import scala.collection.Map
|
import scala.collection.Map
|
||||||
|
|
||||||
class SocketServerTest extends JUnitSuite {
|
class SocketServerTest {
|
||||||
|
|
||||||
val server: SocketServer = new SocketServer(0,
|
val server: SocketServer = new SocketServer(0,
|
||||||
Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT),
|
Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT),
|
||||||
|
@ -84,11 +83,11 @@ class SocketServerTest extends JUnitSuite {
|
||||||
new Socket("localhost", server.boundPort(protocol))
|
new Socket("localhost", server.boundPort(protocol))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@After
|
@After
|
||||||
def cleanup() {
|
def cleanup() {
|
||||||
server.shutdown()
|
server.shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def simpleRequest() {
|
def simpleRequest() {
|
||||||
val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
|
val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
|
||||||
|
@ -175,7 +174,7 @@ class SocketServerTest extends JUnitSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testMaxConnectionsPerIPOverrides(): Unit = {
|
def testMaxConnectionsPerIPOverrides() {
|
||||||
val overrideNum = 6
|
val overrideNum = 6
|
||||||
val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
|
val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
|
||||||
val overrideServer: SocketServer = new SocketServer(0,
|
val overrideServer: SocketServer = new SocketServer(0,
|
||||||
|
|
|
@ -19,36 +19,27 @@ package kafka.producer
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
import java.util.concurrent.LinkedBlockingQueue
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.easymock.EasyMock
|
import org.easymock.EasyMock
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import kafka.api._
|
import kafka.api._
|
||||||
import kafka.cluster.{BrokerEndPoint, Broker}
|
import kafka.cluster.BrokerEndPoint
|
||||||
import kafka.common._
|
import kafka.common._
|
||||||
import kafka.message._
|
import kafka.message._
|
||||||
import kafka.producer.async._
|
import kafka.producer.async._
|
||||||
import kafka.serializer._
|
import kafka.serializer._
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import scala.collection.Map
|
import scala.collection.Map
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
|
|
||||||
class AsyncProducerTest extends JUnit3Suite {
|
class AsyncProducerTest {
|
||||||
// One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks
|
// One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks
|
||||||
val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534))
|
val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534))
|
||||||
val configs = props.map(KafkaConfig.fromProps)
|
val configs = props.map(KafkaConfig.fromProps)
|
||||||
val brokerList = configs.map(c => org.apache.kafka.common.utils.Utils.formatAddress(c.hostName, c.port)).mkString(",")
|
val brokerList = configs.map(c => org.apache.kafka.common.utils.Utils.formatAddress(c.hostName, c.port)).mkString(",")
|
||||||
|
|
||||||
override def setUp() {
|
|
||||||
super.setUp()
|
|
||||||
}
|
|
||||||
|
|
||||||
override def tearDown() {
|
|
||||||
super.tearDown()
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testProducerQueueSize() {
|
def testProducerQueueSize() {
|
||||||
// a mock event handler that blocks
|
// a mock event handler that blocks
|
||||||
|
|
|
@ -17,26 +17,24 @@
|
||||||
|
|
||||||
package kafka.producer
|
package kafka.producer
|
||||||
|
|
||||||
import org.scalatest.TestFailedException
|
import java.util
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import java.util.Properties
|
||||||
|
|
||||||
|
import kafka.admin.AdminUtils
|
||||||
|
import kafka.api.FetchRequestBuilder
|
||||||
|
import kafka.common.{ErrorMapping, FailedToSendMessageException}
|
||||||
import kafka.consumer.SimpleConsumer
|
import kafka.consumer.SimpleConsumer
|
||||||
import kafka.message.Message
|
import kafka.message.Message
|
||||||
|
import kafka.serializer.StringEncoder
|
||||||
import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
|
import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
|
||||||
|
import kafka.utils._
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import org.apache.log4j.{Level, Logger}
|
import org.apache.log4j.{Level, Logger}
|
||||||
import org.junit.Test
|
import org.junit.Assert._
|
||||||
import kafka.utils._
|
import org.junit.{After, Before, Test}
|
||||||
import java.util
|
import org.scalatest.exceptions.TestFailedException
|
||||||
import kafka.admin.AdminUtils
|
|
||||||
import util.Properties
|
|
||||||
import kafka.api.FetchRequestBuilder
|
|
||||||
import org.junit.Assert.assertTrue
|
|
||||||
import org.junit.Assert.assertFalse
|
|
||||||
import org.junit.Assert.assertEquals
|
|
||||||
import kafka.common.{ErrorMapping, FailedToSendMessageException}
|
|
||||||
import kafka.serializer.StringEncoder
|
|
||||||
|
|
||||||
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
|
class ProducerTest extends ZooKeeperTestHarness with Logging{
|
||||||
private val brokerId1 = 0
|
private val brokerId1 = 0
|
||||||
private val brokerId2 = 1
|
private val brokerId2 = 1
|
||||||
private var server1: KafkaServer = null
|
private var server1: KafkaServer = null
|
||||||
|
@ -60,6 +58,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
|
||||||
consumer2
|
consumer2
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
// set up 2 brokers with 4 partitions each
|
// set up 2 brokers with 4 partitions each
|
||||||
|
@ -81,6 +80,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
|
||||||
requestHandlerLogger.setLevel(Level.FATAL)
|
requestHandlerLogger.setLevel(Level.FATAL)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
// restore set request handler logger to a higher level
|
// restore set request handler logger to a higher level
|
||||||
requestHandlerLogger.setLevel(Level.ERROR)
|
requestHandlerLogger.setLevel(Level.ERROR)
|
||||||
|
|
|
@ -20,7 +20,7 @@ package kafka.producer
|
||||||
import java.net.SocketTimeoutException
|
import java.net.SocketTimeoutException
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import junit.framework.Assert
|
import org.junit.Assert
|
||||||
import kafka.admin.AdminUtils
|
import kafka.admin.AdminUtils
|
||||||
import kafka.api.ProducerResponseStatus
|
import kafka.api.ProducerResponseStatus
|
||||||
import kafka.common.{ErrorMapping, TopicAndPartition}
|
import kafka.common.{ErrorMapping, TopicAndPartition}
|
||||||
|
@ -30,9 +30,8 @@ import kafka.server.KafkaConfig
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
|
|
||||||
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
|
class SyncProducerTest extends KafkaServerTestHarness {
|
||||||
private val messageBytes = new Array[Byte](2)
|
private val messageBytes = new Array[Byte](2)
|
||||||
// turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
|
// turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
|
||||||
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
|
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head))
|
||||||
|
|
|
@ -17,18 +17,19 @@
|
||||||
|
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
|
import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.{After, Before}
|
||||||
|
|
||||||
class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class AdvertiseBrokerTest extends ZooKeeperTestHarness {
|
||||||
var server : KafkaServer = null
|
var server : KafkaServer = null
|
||||||
val brokerId = 0
|
val brokerId = 0
|
||||||
val advertisedHostName = "routable-host"
|
val advertisedHostName = "routable-host"
|
||||||
val advertisedPort = 1234
|
val advertisedPort = 1234
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
|
||||||
|
@ -39,6 +40,7 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
server = TestUtils.createServer(KafkaConfig.fromProps(props))
|
server = TestUtils.createServer(KafkaConfig.fromProps(props))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
server.shutdown()
|
server.shutdown()
|
||||||
CoreUtils.rm(server.config.logDirs)
|
CoreUtils.rm(server.config.logDirs)
|
||||||
|
|
|
@ -17,22 +17,21 @@
|
||||||
|
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import org.junit.Test
|
import org.junit.{After, Before, Test}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.Assert._
|
||||||
import junit.framework.Assert._
|
|
||||||
|
|
||||||
class DelayedOperationTest extends JUnit3Suite {
|
class DelayedOperationTest {
|
||||||
|
|
||||||
var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null
|
var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null
|
||||||
|
|
||||||
override def setUp() {
|
@Before
|
||||||
super.setUp()
|
def setUp() {
|
||||||
purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock")
|
purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock")
|
||||||
}
|
}
|
||||||
|
|
||||||
override def tearDown() {
|
@After
|
||||||
|
def tearDown() {
|
||||||
purgatory.shutdown()
|
purgatory.shutdown()
|
||||||
super.tearDown()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -26,9 +26,8 @@ import kafka.utils._
|
||||||
import kafka.common._
|
import kafka.common._
|
||||||
import kafka.log.LogConfig
|
import kafka.log.LogConfig
|
||||||
import kafka.admin.{AdminOperationException, AdminUtils}
|
import kafka.admin.{AdminOperationException, AdminUtils}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
|
|
||||||
class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
|
class DynamicConfigChangeTest extends KafkaServerTestHarness {
|
||||||
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
|
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -19,7 +19,6 @@ package kafka.server
|
||||||
import kafka.log._
|
import kafka.log._
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import org.I0Itec.zkclient.ZkClient
|
import org.I0Itec.zkclient.ZkClient
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import org.easymock.EasyMock
|
import org.easymock.EasyMock
|
||||||
import org.junit._
|
import org.junit._
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
|
@ -28,7 +27,7 @@ import kafka.cluster.Replica
|
||||||
import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils}
|
import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils}
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
|
||||||
class HighwatermarkPersistenceTest extends JUnit3Suite {
|
class HighwatermarkPersistenceTest {
|
||||||
|
|
||||||
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
|
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
|
||||||
val topic = "foo"
|
val topic = "foo"
|
||||||
|
|
|
@ -18,7 +18,7 @@ package kafka.server
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.{Before, After}
|
||||||
import collection.mutable.HashMap
|
import collection.mutable.HashMap
|
||||||
import collection.mutable.Map
|
import collection.mutable.Map
|
||||||
import kafka.cluster.{Partition, Replica}
|
import kafka.cluster.{Partition, Replica}
|
||||||
|
@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean
|
||||||
import kafka.message.MessageSet
|
import kafka.message.MessageSet
|
||||||
|
|
||||||
|
|
||||||
class IsrExpirationTest extends JUnit3Suite {
|
class IsrExpirationTest {
|
||||||
|
|
||||||
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
|
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
|
||||||
val replicaLagTimeMaxMs = 100L
|
val replicaLagTimeMaxMs = 100L
|
||||||
|
@ -46,14 +46,14 @@ class IsrExpirationTest extends JUnit3Suite {
|
||||||
|
|
||||||
var replicaManager: ReplicaManager = null
|
var replicaManager: ReplicaManager = null
|
||||||
|
|
||||||
override def setUp() {
|
@Before
|
||||||
super.setUp()
|
def setUp() {
|
||||||
replicaManager = new ReplicaManager(configs.head, time, null, null, null, new AtomicBoolean(false))
|
replicaManager = new ReplicaManager(configs.head, time, null, null, null, new AtomicBoolean(false))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def tearDown() {
|
@After
|
||||||
|
def tearDown() {
|
||||||
replicaManager.shutdown(false)
|
replicaManager.shutdown(false)
|
||||||
super.tearDown()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -26,9 +26,9 @@ import kafka.utils.{TestUtils, CoreUtils}
|
||||||
import org.apache.kafka.common.config.ConfigException
|
import org.apache.kafka.common.config.ConfigException
|
||||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||||
import org.junit.{Assert, Test}
|
import org.junit.{Assert, Test}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.scalatest.Assertions.intercept
|
||||||
|
|
||||||
class KafkaConfigTest extends JUnit3Suite {
|
class KafkaConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testLogRetentionTimeHoursProvided() {
|
def testLogRetentionTimeHoursProvided() {
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import kafka.api._
|
import kafka.api._
|
||||||
import kafka.utils.{TestUtils, ZkUtils, CoreUtils}
|
import kafka.utils.{TestUtils, ZkUtils, CoreUtils}
|
||||||
import kafka.cluster.Broker
|
import kafka.cluster.Broker
|
||||||
|
@ -26,9 +26,9 @@ import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrA
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.{After, Before}
|
||||||
|
|
||||||
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class LeaderElectionTest extends ZooKeeperTestHarness {
|
||||||
val brokerId1 = 0
|
val brokerId1 = 0
|
||||||
val brokerId2 = 1
|
val brokerId2 = 1
|
||||||
|
|
||||||
|
@ -36,6 +36,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
|
|
||||||
var staleControllerEpochDetected = false
|
var staleControllerEpochDetected = false
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
|
||||||
|
@ -48,6 +49,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
servers ++= List(server1, server2)
|
servers ++= List(server1, server2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
servers.foreach(_.shutdown())
|
servers.foreach(_.shutdown())
|
||||||
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
servers.foreach(server => CoreUtils.rm(server.config.logDirs))
|
||||||
|
|
|
@ -19,12 +19,11 @@ package kafka.server
|
||||||
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import java.util.{Random, Properties}
|
import java.util.{Random, Properties}
|
||||||
import kafka.consumer.SimpleConsumer
|
import kafka.consumer.SimpleConsumer
|
||||||
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
|
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.admin.AdminUtils
|
import kafka.admin.AdminUtils
|
||||||
import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
|
import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
|
@ -33,7 +32,7 @@ import org.junit.After
|
||||||
import org.junit.Before
|
import org.junit.Before
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
|
|
||||||
class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class LogOffsetTest extends ZooKeeperTestHarness {
|
||||||
val random = new Random()
|
val random = new Random()
|
||||||
var logDir: File = null
|
var logDir: File = null
|
||||||
var topicLogDir: File = null
|
var topicLogDir: File = null
|
||||||
|
|
|
@ -27,10 +27,10 @@ import kafka.serializer.StringEncoder
|
||||||
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.{After, Before}
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
|
|
||||||
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class LogRecoveryTest extends ZooKeeperTestHarness {
|
||||||
|
|
||||||
val replicaLagTimeMaxMs = 5000L
|
val replicaLagTimeMaxMs = 5000L
|
||||||
val replicaLagMaxMessages = 10L
|
val replicaLagMaxMessages = 10L
|
||||||
|
@ -69,6 +69,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
keyEncoder = classOf[IntEncoder].getName)
|
keyEncoder = classOf[IntEncoder].getName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
|
|
||||||
|
@ -86,6 +87,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
updateProducer()
|
updateProducer()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
producer.close()
|
producer.close()
|
||||||
for(server <- servers) {
|
for(server <- servers) {
|
||||||
|
|
|
@ -25,7 +25,6 @@ import kafka.utils.TestUtils._
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
|
|
||||||
import org.junit.{After, Before, Test}
|
import org.junit.{After, Before, Test}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
@ -33,9 +32,9 @@ import java.io.File
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
|
|
||||||
class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class OffsetCommitTest extends ZooKeeperTestHarness {
|
||||||
val random: Random = new Random()
|
val random: Random = new Random()
|
||||||
val group = "test-group"
|
val group = "test-group"
|
||||||
val retentionCheckInterval: Long = 100L
|
val retentionCheckInterval: Long = 100L
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.{After, Before}
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.utils.TestUtils._
|
import kafka.utils.TestUtils._
|
||||||
import kafka.producer.KeyedMessage
|
import kafka.producer.KeyedMessage
|
||||||
|
@ -25,11 +25,12 @@ import kafka.serializer.StringEncoder
|
||||||
import kafka.utils.{TestUtils}
|
import kafka.utils.{TestUtils}
|
||||||
import kafka.common._
|
import kafka.common._
|
||||||
|
|
||||||
class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class ReplicaFetchTest extends ZooKeeperTestHarness {
|
||||||
var brokers: Seq[KafkaServer] = null
|
var brokers: Seq[KafkaServer] = null
|
||||||
val topic1 = "foo"
|
val topic1 = "foo"
|
||||||
val topic2 = "bar"
|
val topic2 = "bar"
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
brokers = createBrokerConfigs(2, zkConnect, false)
|
brokers = createBrokerConfigs(2, zkConnect, false)
|
||||||
|
@ -37,6 +38,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
.map(config => TestUtils.createServer(config))
|
.map(config => TestUtils.createServer(config))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
override def tearDown() {
|
override def tearDown() {
|
||||||
brokers.foreach(_.shutdown())
|
brokers.foreach(_.shutdown())
|
||||||
super.tearDown()
|
super.tearDown()
|
||||||
|
|
|
@ -27,12 +27,11 @@ import java.io.File
|
||||||
import org.apache.kafka.common.protocol.Errors
|
import org.apache.kafka.common.protocol.Errors
|
||||||
import org.easymock.EasyMock
|
import org.easymock.EasyMock
|
||||||
import org.I0Itec.zkclient.ZkClient
|
import org.I0Itec.zkclient.ZkClient
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
|
|
||||||
import scala.collection.Map
|
import scala.collection.Map
|
||||||
|
|
||||||
class ReplicaManagerTest extends JUnit3Suite {
|
class ReplicaManagerTest {
|
||||||
|
|
||||||
val topic = "test-topic"
|
val topic = "test-topic"
|
||||||
|
|
||||||
|
@ -84,7 +83,7 @@ class ReplicaManagerTest extends JUnit3Suite {
|
||||||
|
|
||||||
rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback)
|
rm.appendMessages(timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, messagesPerPartition = produceRequest.data, responseCallback = callback)
|
||||||
|
|
||||||
rm.shutdown(false);
|
rm.shutdown(false)
|
||||||
|
|
||||||
TestUtils.verifyNonDaemonThreadsStatus
|
TestUtils.verifyNonDaemonThreadsStatus
|
||||||
|
|
||||||
|
|
|
@ -20,18 +20,18 @@ import java.util.Properties
|
||||||
|
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.utils.{TestUtils, CoreUtils}
|
import kafka.utils.{TestUtils, CoreUtils}
|
||||||
import org.junit.Test
|
import org.junit.{Before, Test}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.Assert._
|
||||||
import junit.framework.Assert._
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
|
||||||
var props1: Properties = null
|
var props1: Properties = null
|
||||||
var config1: KafkaConfig = null
|
var config1: KafkaConfig = null
|
||||||
var props2: Properties = null
|
var props2: Properties = null
|
||||||
var config2: KafkaConfig = null
|
var config2: KafkaConfig = null
|
||||||
val brokerMetaPropsFile = "meta.properties"
|
val brokerMetaPropsFile = "meta.properties"
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
props1 = TestUtils.createBrokerConfig(-1, zkConnect)
|
props1 = TestUtils.createBrokerConfig(-1, zkConnect)
|
||||||
|
|
|
@ -27,18 +27,18 @@ import kafka.serializer.StringEncoder
|
||||||
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
import org.junit.Test
|
import org.junit.{Before, Test}
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.Assert._
|
||||||
import junit.framework.Assert._
|
|
||||||
|
|
||||||
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class ServerShutdownTest extends ZooKeeperTestHarness {
|
||||||
var config: KafkaConfig = null
|
var config: KafkaConfig = null
|
||||||
val host = "localhost"
|
val host = "localhost"
|
||||||
val topic = "test"
|
val topic = "test"
|
||||||
val sent1 = List("hello", "there")
|
val sent1 = List("hello", "there")
|
||||||
val sent2 = List("more", "messages")
|
val sent2 = List("more", "messages")
|
||||||
|
|
||||||
override def setUp(): Unit = {
|
@Before
|
||||||
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
val props = TestUtils.createBrokerConfig(0, zkConnect)
|
val props = TestUtils.createBrokerConfig(0, zkConnect)
|
||||||
config = KafkaConfig.fromProps(props)
|
config = KafkaConfig.fromProps(props)
|
||||||
|
|
|
@ -17,15 +17,14 @@
|
||||||
|
|
||||||
package kafka.server
|
package kafka.server
|
||||||
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import kafka.utils.ZkUtils
|
import kafka.utils.ZkUtils
|
||||||
import kafka.utils.CoreUtils
|
import kafka.utils.CoreUtils
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
|
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
|
|
||||||
class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class ServerStartupTest extends ZooKeeperTestHarness {
|
||||||
|
|
||||||
def testBrokerCreatesZKChroot {
|
def testBrokerCreatesZKChroot {
|
||||||
val brokerId = 0
|
val brokerId = 0
|
||||||
|
|
|
@ -22,18 +22,17 @@ import kafka.cluster.Replica
|
||||||
import kafka.common.TopicAndPartition
|
import kafka.common.TopicAndPartition
|
||||||
import kafka.log.Log
|
import kafka.log.Log
|
||||||
import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
|
import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
|
||||||
|
import org.junit.{After, Before}
|
||||||
|
|
||||||
import scala.Some
|
|
||||||
import java.util.{Properties, Collections}
|
import java.util.{Properties, Collections}
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
import collection.JavaConversions._
|
import collection.JavaConversions._
|
||||||
|
|
||||||
import org.easymock.EasyMock
|
import org.easymock.EasyMock
|
||||||
import org.I0Itec.zkclient.ZkClient
|
import org.I0Itec.zkclient.ZkClient
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.Assert._
|
||||||
import junit.framework.Assert._
|
|
||||||
|
|
||||||
class SimpleFetchTest extends JUnit3Suite {
|
class SimpleFetchTest {
|
||||||
|
|
||||||
val replicaLagTimeMaxMs = 100L
|
val replicaLagTimeMaxMs = 100L
|
||||||
val replicaFetchWaitMaxMs = 100
|
val replicaFetchWaitMaxMs = 100
|
||||||
|
@ -63,9 +62,8 @@ class SimpleFetchTest extends JUnit3Suite {
|
||||||
|
|
||||||
var replicaManager: ReplicaManager = null
|
var replicaManager: ReplicaManager = null
|
||||||
|
|
||||||
override def setUp() {
|
@Before
|
||||||
super.setUp()
|
def setUp() {
|
||||||
|
|
||||||
// create nice mock since we don't particularly care about zkclient calls
|
// create nice mock since we don't particularly care about zkclient calls
|
||||||
val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
|
val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
|
||||||
EasyMock.replay(zkClient)
|
EasyMock.replay(zkClient)
|
||||||
|
@ -117,9 +115,9 @@ class SimpleFetchTest extends JUnit3Suite {
|
||||||
partition.inSyncReplicas = allReplicas.toSet
|
partition.inSyncReplicas = allReplicas.toSet
|
||||||
}
|
}
|
||||||
|
|
||||||
override def tearDown() {
|
@After
|
||||||
|
def tearDown() {
|
||||||
replicaManager.shutdown(false)
|
replicaManager.shutdown(false)
|
||||||
super.tearDown()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -19,7 +19,7 @@ package kafka.utils
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
|
|
||||||
class ByteBoundedBlockingQueueTest {
|
class ByteBoundedBlockingQueueTest {
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package kafka.utils
|
package kafka.utils
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
|
|
||||||
class CommandLineUtilsTest {
|
class CommandLineUtilsTest {
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package kafka.utils
|
package kafka.utils
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.scalatest.Assertions
|
import org.scalatest.Assertions
|
||||||
import org.junit.{Test, After, Before}
|
import org.junit.{Test, After, Before}
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package kafka.utils
|
package kafka.utils
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.{Test, After, Before}
|
import org.junit.{Test, After, Before}
|
||||||
|
|
||||||
class JsonTest {
|
class JsonTest {
|
||||||
|
|
|
@ -22,13 +22,12 @@ import kafka.server.{ReplicaFetcherManager, KafkaConfig}
|
||||||
import kafka.api.LeaderAndIsr
|
import kafka.api.LeaderAndIsr
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.common.TopicAndPartition
|
import kafka.common.TopicAndPartition
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
import org.junit.Test
|
import org.junit.{Before, Test}
|
||||||
import org.easymock.EasyMock
|
import org.easymock.EasyMock
|
||||||
|
|
||||||
|
|
||||||
class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class ReplicationUtilsTest extends ZooKeeperTestHarness {
|
||||||
val topic = "my-topic-test"
|
val topic = "my-topic-test"
|
||||||
val partitionId = 0
|
val partitionId = 0
|
||||||
val brokerId = 1
|
val brokerId = 1
|
||||||
|
@ -45,7 +44,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
|
|
||||||
val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(1,leaderEpoch,List(1,2),0), controllerEpoch)
|
val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(1,leaderEpoch,List(1,2),0), controllerEpoch)
|
||||||
|
|
||||||
|
@Before
|
||||||
override def setUp() {
|
override def setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
ZkUtils.createPersistentPath(zkClient,topicPath,topicData)
|
ZkUtils.createPersistentPath(zkClient,topicPath,topicData)
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package kafka.utils
|
package kafka.utils
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import java.util.concurrent.atomic._
|
import java.util.concurrent.atomic._
|
||||||
import org.junit.{Test, After, Before}
|
import org.junit.{Test, After, Before}
|
||||||
import kafka.utils.TestUtils.retry
|
import kafka.utils.TestUtils.retry
|
||||||
|
|
|
@ -43,8 +43,7 @@ import kafka.admin.AdminUtils
|
||||||
import kafka.producer.ProducerConfig
|
import kafka.producer.ProducerConfig
|
||||||
import kafka.log._
|
import kafka.log._
|
||||||
|
|
||||||
import junit.framework.AssertionFailedError
|
import org.junit.Assert._
|
||||||
import junit.framework.Assert._
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer
|
import org.apache.kafka.clients.producer.KafkaProducer
|
||||||
|
|
||||||
import scala.collection.Map
|
import scala.collection.Map
|
||||||
|
@ -595,7 +594,7 @@ object TestUtils extends Logging {
|
||||||
block
|
block
|
||||||
return
|
return
|
||||||
} catch {
|
} catch {
|
||||||
case e: AssertionFailedError =>
|
case e: AssertionError =>
|
||||||
val ellapsed = System.currentTimeMillis - startTime
|
val ellapsed = System.currentTimeMillis - startTime
|
||||||
if(ellapsed > maxWaitMs) {
|
if(ellapsed > maxWaitMs) {
|
||||||
throw e
|
throw e
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package kafka.utils.timer
|
package kafka.utils.timer
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import java.util.concurrent.atomic._
|
import java.util.concurrent.atomic._
|
||||||
import org.junit.{Test, After, Before}
|
import org.junit.{Test, After, Before}
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@ package kafka.utils.timer
|
||||||
|
|
||||||
import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit}
|
import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit}
|
||||||
|
|
||||||
import junit.framework.Assert._
|
import org.junit.Assert._
|
||||||
import java.util.concurrent.atomic._
|
import java.util.concurrent.atomic._
|
||||||
import org.junit.{Test, After, Before}
|
import org.junit.{Test, After, Before}
|
||||||
|
|
||||||
|
|
|
@ -18,13 +18,11 @@
|
||||||
package kafka.zk
|
package kafka.zk
|
||||||
|
|
||||||
import kafka.consumer.ConsumerConfig
|
import kafka.consumer.ConsumerConfig
|
||||||
import org.I0Itec.zkclient.ZkClient
|
|
||||||
import kafka.utils.ZkUtils
|
import kafka.utils.ZkUtils
|
||||||
import kafka.utils.TestUtils
|
import kafka.utils.TestUtils
|
||||||
import org.junit.Assert
|
import org.junit.Assert
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
|
|
||||||
class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class ZKEphemeralTest extends ZooKeeperTestHarness {
|
||||||
var zkSessionTimeoutMs = 1000
|
var zkSessionTimeoutMs = 1000
|
||||||
|
|
||||||
def testEphemeralNodeCleanup = {
|
def testEphemeralNodeCleanup = {
|
||||||
|
|
|
@ -17,13 +17,12 @@
|
||||||
|
|
||||||
package kafka.zk
|
package kafka.zk
|
||||||
|
|
||||||
import junit.framework.Assert
|
|
||||||
import kafka.consumer.ConsumerConfig
|
import kafka.consumer.ConsumerConfig
|
||||||
import kafka.utils.{ZkPath, TestUtils, ZkUtils}
|
import kafka.utils.{ZkPath, TestUtils, ZkUtils}
|
||||||
import org.apache.kafka.common.config.ConfigException
|
import org.apache.kafka.common.config.ConfigException
|
||||||
import org.scalatest.junit.JUnit3Suite
|
import org.junit.Assert._
|
||||||
|
|
||||||
class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
class ZKPathTest extends ZooKeeperTestHarness {
|
||||||
|
|
||||||
val path: String = "/some_dir"
|
val path: String = "/some_dir"
|
||||||
val zkSessionTimeoutMs = 1000
|
val zkSessionTimeoutMs = 1000
|
||||||
|
@ -54,7 +53,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
case exception: Throwable => fail("Failed to create persistent path")
|
case exception: Throwable => fail("Failed to create persistent path")
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
||||||
}
|
}
|
||||||
|
|
||||||
def testMakeSurePersistsPathExistsThrowsException {
|
def testMakeSurePersistsPathExistsThrowsException {
|
||||||
|
@ -82,7 +81,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
case exception: Throwable => fail("Failed to create persistent path")
|
case exception: Throwable => fail("Failed to create persistent path")
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
||||||
}
|
}
|
||||||
|
|
||||||
def testCreateEphemeralPathThrowsException {
|
def testCreateEphemeralPathThrowsException {
|
||||||
|
@ -110,7 +109,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
case exception: Throwable => fail("Failed to create ephemeral path")
|
case exception: Throwable => fail("Failed to create ephemeral path")
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
|
assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
|
||||||
}
|
}
|
||||||
|
|
||||||
def testCreatePersistentSequentialThrowsException {
|
def testCreatePersistentSequentialThrowsException {
|
||||||
|
@ -140,6 +139,6 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness {
|
||||||
case exception: Throwable => fail("Failed to create persistent path")
|
case exception: Throwable => fail("Failed to create persistent path")
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath))
|
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,11 +17,12 @@
|
||||||
|
|
||||||
package kafka.zk
|
package kafka.zk
|
||||||
|
|
||||||
import org.scalatest.junit.JUnit3Suite
|
|
||||||
import org.I0Itec.zkclient.ZkClient
|
import org.I0Itec.zkclient.ZkClient
|
||||||
import kafka.utils.{ZkUtils, CoreUtils}
|
import kafka.utils.{ZkUtils, CoreUtils}
|
||||||
|
import org.junit.{After, Before}
|
||||||
|
import org.scalatest.junit.JUnitSuite
|
||||||
|
|
||||||
trait ZooKeeperTestHarness extends JUnit3Suite {
|
trait ZooKeeperTestHarness extends JUnitSuite {
|
||||||
var zkPort: Int = -1
|
var zkPort: Int = -1
|
||||||
var zookeeper: EmbeddedZookeeper = null
|
var zookeeper: EmbeddedZookeeper = null
|
||||||
var zkClient: ZkClient = null
|
var zkClient: ZkClient = null
|
||||||
|
@ -30,17 +31,17 @@ trait ZooKeeperTestHarness extends JUnit3Suite {
|
||||||
|
|
||||||
def zkConnect: String = "127.0.0.1:" + zkPort
|
def zkConnect: String = "127.0.0.1:" + zkPort
|
||||||
|
|
||||||
override def setUp() {
|
@Before
|
||||||
super.setUp
|
def setUp() {
|
||||||
zookeeper = new EmbeddedZookeeper()
|
zookeeper = new EmbeddedZookeeper()
|
||||||
zkPort = zookeeper.port
|
zkPort = zookeeper.port
|
||||||
zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout)
|
zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def tearDown() {
|
@After
|
||||||
|
def tearDown() {
|
||||||
CoreUtils.swallow(zkClient.close())
|
CoreUtils.swallow(zkClient.close())
|
||||||
CoreUtils.swallow(zookeeper.shutdown())
|
CoreUtils.swallow(zookeeper.shutdown())
|
||||||
super.tearDown
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue