KAFKA-1782: Follow up - add missing @Test annotations.

This commit is contained in:
Ewen Cheslack-Postava 2015-08-14 15:43:56 -07:00
parent c8e62c9818
commit 1dcaf39d48
24 changed files with 104 additions and 18 deletions

View File

@ -19,7 +19,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.junit.Assert._ import org.junit.Assert._
import org.junit.Before import org.junit.{Test, Before}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
@ -61,6 +61,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers) TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
} }
@Test
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10) def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10)
/* /*
@ -96,6 +97,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
scheduler.shutdown() scheduler.shutdown()
} }
@Test
def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5) def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5)
def seekAndCommitWithBrokerFailures(numIters: Int) { def seekAndCommitWithBrokerFailures(numIters: Int) {

View File

@ -25,7 +25,7 @@ import kafka.server.KafkaConfig
import java.util.ArrayList import java.util.ArrayList
import org.junit.Assert._ import org.junit.Assert._
import org.junit.Before import org.junit.{Test, Before}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import kafka.coordinator.ConsumerCoordinator import kafka.coordinator.ConsumerCoordinator
@ -65,6 +65,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
TestUtils.createTopic(this.zkClient, topic, 2, serverCount, this.servers) TestUtils.createTopic(this.zkClient, topic, 2, serverCount, this.servers)
} }
@Test
def testSimpleConsumption() { def testSimpleConsumption() {
val numRecords = 10000 val numRecords = 10000
sendRecords(numRecords) sendRecords(numRecords)
@ -86,6 +87,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
awaitCommitCallback(this.consumers(0), commitCallback) awaitCommitCallback(this.consumers(0), commitCallback)
} }
@Test
def testCommitSpecifiedOffsets() { def testCommitSpecifiedOffsets() {
sendRecords(5, tp) sendRecords(5, tp)
sendRecords(7, tp2) sendRecords(7, tp2)
@ -116,12 +118,14 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertEquals(7, this.consumers(0).committed(tp2)) assertEquals(7, this.consumers(0).committed(tp2))
} }
@Test
def testAutoOffsetReset() { def testAutoOffsetReset() {
sendRecords(1) sendRecords(1)
this.consumers(0).subscribe(tp) this.consumers(0).subscribe(tp)
consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0) consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
} }
@Test
def testSeek() { def testSeek() {
val consumer = this.consumers(0) val consumer = this.consumers(0)
val totalRecords = 50L val totalRecords = 50L
@ -142,12 +146,14 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt) consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt)
} }
@Test
def testGroupConsumption() { def testGroupConsumption() {
sendRecords(10) sendRecords(10)
this.consumers(0).subscribe(topic) this.consumers(0).subscribe(topic)
consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0) consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
} }
@Test
def testPositionAndCommit() { def testPositionAndCommit() {
sendRecords(5) sendRecords(5)
@ -179,6 +185,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumeRecords(this.consumers(1), 1, 5) consumeRecords(this.consumers(1), 1, 5)
} }
@Test
def testPartitionsFor() { def testPartitionsFor() {
val numParts = 2 val numParts = 2
TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers) TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers)
@ -188,6 +195,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertNull(this.consumers(0).partitionsFor("non-exist-topic")) assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
} }
@Test
def testListTopics() { def testListTopics() {
val numParts = 2 val numParts = 2
val topic1: String = "part-test-topic-1" val topic1: String = "part-test-topic-1"
@ -206,6 +214,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertEquals(2, topics.get(topic3).size) assertEquals(2, topics.get(topic3).size)
} }
@Test
def testPartitionReassignmentCallback() { def testPartitionReassignmentCallback() {
val callback = new TestConsumerReassignmentCallback() val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
@ -238,6 +247,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumer0.close() consumer0.close()
} }
@Test
def testUnsubscribeTopic() { def testUnsubscribeTopic() {
val callback = new TestConsumerReassignmentCallback() val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
@ -258,6 +268,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
} }
} }
@Test
def testExpandingTopicSubscriptions() { def testExpandingTopicSubscriptions() {
val otherTopic = "other" val otherTopic = "other"
val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
@ -276,6 +287,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
}, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}") }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
} }
@Test
def testShrinkingTopicSubscriptions() { def testShrinkingTopicSubscriptions() {
val otherTopic = "other" val otherTopic = "other"
TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers) TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers)
@ -294,6 +306,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
}, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}") }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
} }
@Test
def testPartitionPauseAndResume() { def testPartitionPauseAndResume() {
sendRecords(5) sendRecords(5)
this.consumers(0).subscribe(tp) this.consumers(0).subscribe(tp)
@ -305,6 +318,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumeRecords(this.consumers(0), 5, 5) consumeRecords(this.consumers(0), 5, 5)
} }
@Test
def testPauseStateNotPreservedByRebalance() { def testPauseStateNotPreservedByRebalance() {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30"); this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");

View File

@ -25,7 +25,7 @@ import kafka.utils.{ZkUtils, CoreUtils, TestUtils}
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.client.ClientUtils import kafka.client.ClientUtils
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.{KafkaConfig, KafkaServer}
import org.junit.{After, Before} import org.junit.{Test, After, Before}
class AddPartitionsTest extends ZooKeeperTestHarness { class AddPartitionsTest extends ZooKeeperTestHarness {
var configs: Seq[KafkaConfig] = null var configs: Seq[KafkaConfig] = null
@ -62,6 +62,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
super.tearDown() super.tearDown()
} }
@Test
def testTopicDoesNotExist { def testTopicDoesNotExist {
try { try {
AdminUtils.addPartitions(zkClient, "Blah", 1) AdminUtils.addPartitions(zkClient, "Blah", 1)
@ -72,6 +73,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
} }
} }
@Test
def testWrongReplicaCount { def testWrongReplicaCount {
try { try {
AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2") AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2")
@ -82,6 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
} }
} }
@Test
def testIncrementPartitions { def testIncrementPartitions {
AdminUtils.addPartitions(zkClient, topic1, 3) AdminUtils.addPartitions(zkClient, topic1, 3)
// wait until leader is elected // wait until leader is elected
@ -107,6 +110,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
assert(replicas.contains(partitionDataForTopic1(1).leader.get)) assert(replicas.contains(partitionDataForTopic1(1).leader.get))
} }
@Test
def testManualAssignmentOfReplicas { def testManualAssignmentOfReplicas {
AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3") AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3")
// wait until leader is elected // wait until leader is elected
@ -133,6 +137,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
assert(replicas(1).id == 0 || replicas(1).id == 1) assert(replicas(1).id == 0 || replicas(1).id == 1)
} }
@Test
def testReplicaPlacement { def testReplicaPlacement {
AdminUtils.addPartitions(zkClient, topic3, 7) AdminUtils.addPartitions(zkClient, topic3, 7)

View File

@ -26,9 +26,11 @@ import kafka.common.TopicAndPartition
import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
import kafka.consumer.PartitionAssignorTest.Scenario import kafka.consumer.PartitionAssignorTest.Scenario
import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
import org.junit.Test
class PartitionAssignorTest extends Logging { class PartitionAssignorTest extends Logging {
@Test
def testRoundRobinPartitionAssignor() { def testRoundRobinPartitionAssignor() {
val assignor = new RoundRobinAssignor val assignor = new RoundRobinAssignor
@ -52,6 +54,7 @@ class PartitionAssignorTest extends Logging {
}) })
} }
@Test
def testRangePartitionAssignor() { def testRangePartitionAssignor() {
val assignor = new RangeAssignor val assignor = new RangeAssignor
(1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {

View File

@ -30,7 +30,7 @@ import kafka.utils.TestUtils._
import kafka.utils._ import kafka.utils._
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.junit.{After, Before} import org.junit.{Test, After, Before}
import scala.collection._ import scala.collection._
@ -65,6 +65,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
super.tearDown() super.tearDown()
} }
@Test
def testBasic() { def testBasic() {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL) requestHandlerLogger.setLevel(Level.FATAL)
@ -175,7 +176,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
requestHandlerLogger.setLevel(Level.ERROR) requestHandlerLogger.setLevel(Level.ERROR)
} }
@Test
def testCompression() { def testCompression() {
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler]) val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL) requestHandlerLogger.setLevel(Level.FATAL)
@ -255,6 +256,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
requestHandlerLogger.setLevel(Level.ERROR) requestHandlerLogger.setLevel(Level.ERROR)
} }
@Test
def testCompressionSetConsumption() { def testCompressionSetConsumption() {
// send some messages to each broker // send some messages to each broker
val sentMessages = sendMessages(servers, topic, 200, 0, DefaultCompressionCodec) ++ val sentMessages = sendMessages(servers, topic, 200, 0, DefaultCompressionCodec) ++
@ -278,6 +280,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
zkConsumerConnector1.shutdown zkConsumerConnector1.shutdown
} }
@Test
def testConsumerDecoder() { def testConsumerDecoder() {
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler]) val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL) requestHandlerLogger.setLevel(Level.FATAL)
@ -317,6 +320,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
requestHandlerLogger.setLevel(Level.ERROR) requestHandlerLogger.setLevel(Level.ERROR)
} }
@Test
def testLeaderSelectionForPartition() { def testLeaderSelectionForPartition() {
val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000) val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000)
@ -348,6 +352,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
zkClient.close() zkClient.close()
} }
@Test
def testConsumerRebalanceListener() { def testConsumerRebalanceListener() {
// Send messages to create topic // Send messages to create topic
sendMessages(servers, topic, nMessages, 0) sendMessages(servers, topic, nMessages, 0)

View File

@ -66,6 +66,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
* See @link{https://issues.apache.org/jira/browse/KAFKA-2300} * See @link{https://issues.apache.org/jira/browse/KAFKA-2300}
* for the background of this test case * for the background of this test case
*/ */
@Test
def testMetadataUpdate() { def testMetadataUpdate() {
log.setLevel(Level.INFO) log.setLevel(Level.INFO)
var controller: KafkaServer = this.servers.head; var controller: KafkaServer = this.servers.head;

View File

@ -19,7 +19,7 @@ package kafka.integration
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import org.junit.{After, Before} import org.junit.{Test, After, Before}
import scala.collection._ import scala.collection._
import org.junit.Assert._ import org.junit.Assert._
@ -65,6 +65,7 @@ class FetcherTest extends KafkaServerTestHarness {
super.tearDown super.tearDown
} }
@Test
def testFetcher() { def testFetcher() {
val perNode = 2 val perNode = 2
var count = TestUtils.sendMessages(servers, topic, perNode).size var count = TestUtils.sendMessages(servers, topic, perNode).size

View File

@ -21,6 +21,7 @@ import java.util.Properties
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.junit.Test
class MinIsrConfigTest extends KafkaServerTestHarness { class MinIsrConfigTest extends KafkaServerTestHarness {
@ -28,6 +29,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness {
overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5") overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5")
def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps)) def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
@Test
def testDefaultKafkaConfig() { def testDefaultKafkaConfig() {
assert(servers.head.getLogManager().defaultConfig.minInSyncReplicas == 5) assert(servers.head.getLogManager().defaultConfig.minInSyncReplicas == 5)
} }

View File

@ -24,6 +24,7 @@ import kafka.server.{KafkaRequestHandler, KafkaConfig}
import kafka.producer.{KeyedMessage, Producer} import kafka.producer.{KeyedMessage, Producer}
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.junit.Test
import scala.collection._ import scala.collection._
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils} import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils}
@ -38,6 +39,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@Test
def testFetchRequestCanProperlySerialize() { def testFetchRequestCanProperlySerialize() {
val request = new FetchRequestBuilder() val request = new FetchRequestBuilder()
.clientId("test-client") .clientId("test-client")
@ -54,6 +56,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
assertEquals(request, deserializedRequest) assertEquals(request, deserializedRequest)
} }
@Test
def testEmptyFetchRequest() { def testEmptyFetchRequest() {
val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]() val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]()
val request = new FetchRequest(requestInfo = partitionRequests) val request = new FetchRequest(requestInfo = partitionRequests)
@ -61,6 +64,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
assertTrue(!fetched.hasError && fetched.data.size == 0) assertTrue(!fetched.hasError && fetched.data.size == 0)
} }
@Test
def testDefaultEncoderProducerAndFetch() { def testDefaultEncoderProducerAndFetch() {
val topic = "test-topic" val topic = "test-topic"
@ -84,6 +88,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8")) assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
} }
@Test
def testDefaultEncoderProducerAndFetchWithCompression() { def testDefaultEncoderProducerAndFetchWithCompression() {
val topic = "test-topic" val topic = "test-topic"
val props = new Properties() val props = new Properties()
@ -170,6 +175,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
requestHandlerLogger.setLevel(Level.ERROR) requestHandlerLogger.setLevel(Level.ERROR)
} }
@Test
def testProduceAndMultiFetch() { def testProduceAndMultiFetch() {
produceAndMultiFetch(producer) produceAndMultiFetch(producer)
} }
@ -196,10 +202,12 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
} }
} }
@Test
def testMultiProduce() { def testMultiProduce() {
multiProduce(producer) multiProduce(producer)
} }
@Test
def testConsumerEmptyTopic() { def testConsumerEmptyTopic() {
val newTopic = "new-topic" val newTopic = "new-topic"
TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers) TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
@ -208,6 +216,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
} }
@Test
def testPipelinedProduceRequests() { def testPipelinedProduceRequests() {
val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0) val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers)) topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))

View File

@ -17,7 +17,7 @@
package kafka.integration package kafka.integration
import org.junit.{After, Before} import org.junit.{Test, After, Before}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import org.junit.Assert._ import org.junit.Assert._
@ -47,6 +47,7 @@ class RollingBounceTest extends ZooKeeperTestHarness {
super.tearDown() super.tearDown()
} }
@Test
def testRollingBounce { def testRollingBounce {
// start all the brokers // start all the brokers
val topic1 = "new-topic1" val topic1 = "new-topic1"

View File

@ -30,7 +30,7 @@ import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{After, Before} import org.junit.{Test, After, Before}
class TopicMetadataTest extends ZooKeeperTestHarness { class TopicMetadataTest extends ZooKeeperTestHarness {
private var server1: KafkaServer = null private var server1: KafkaServer = null
@ -54,6 +54,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
super.tearDown() super.tearDown()
} }
@Test
def testTopicMetadataRequest { def testTopicMetadataRequest {
// create topic // create topic
val topic = "test" val topic = "test"
@ -70,6 +71,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
assertEquals(topicMetadataRequest, deserializedMetadataRequest) assertEquals(topicMetadataRequest, deserializedMetadataRequest)
} }
@Test
def testBasicTopicMetadata { def testBasicTopicMetadata {
// create topic // create topic
val topic = "test" val topic = "test"
@ -87,6 +89,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
assertEquals(1, partitionMetadata.head.replicas.size) assertEquals(1, partitionMetadata.head.replicas.size)
} }
@Test
def testGetAllTopicMetadata { def testGetAllTopicMetadata {
// create topic // create topic
val topic1 = "testGetAllTopicMetadata1" val topic1 = "testGetAllTopicMetadata1"
@ -111,6 +114,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
assertEquals(1, partitionMetadataTopic2.head.replicas.size) assertEquals(1, partitionMetadataTopic2.head.replicas.size)
} }
@Test
def testAutoCreateTopic { def testAutoCreateTopic {
// auto create topic // auto create topic
val topic = "testAutoCreateTopic" val topic = "testAutoCreateTopic"
@ -137,6 +141,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
assertTrue(partitionMetadata.head.leader.isDefined) assertTrue(partitionMetadata.head.leader.isDefined)
} }
@Test
def testAutoCreateTopicWithCollision { def testAutoCreateTopicWithCollision {
// auto create topic // auto create topic
val topic1 = "testAutoCreate_Topic" val topic1 = "testAutoCreate_Topic"
@ -199,7 +204,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}) })
} }
@Test
def testIsrAfterBrokerShutDownAndJoinsBack { def testIsrAfterBrokerShutDownAndJoinsBack {
val numBrokers = 2 //just 2 brokers are enough for the test val numBrokers = 2 //just 2 brokers are enough for the test
@ -250,10 +255,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
} }
@Test
def testAliveBrokerListWithNoTopics { def testAliveBrokerListWithNoTopics {
checkMetadata(Seq(server1), 1) checkMetadata(Seq(server1), 1)
} }
@Test
def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup { def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p)) var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
@ -267,6 +274,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
} }
@Test
def testAliveBrokersListWithNoTopicsAfterABrokerShutdown { def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
val adHocServers = adHocConfigs.map(p => createServer(p)) val adHocServers = adHocConfigs.map(p => createServer(p))

View File

@ -18,7 +18,7 @@
package kafka.integration package kafka.integration
import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigException
import org.junit.{After, Before} import org.junit.{Test, After, Before}
import scala.util.Random import scala.util.Random
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
@ -99,6 +99,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
} }
} }
@Test
def testUncleanLeaderElectionEnabled { def testUncleanLeaderElectionEnabled {
// unclean leader election is enabled by default // unclean leader election is enabled by default
startBrokers(Seq(configProps1, configProps2)) startBrokers(Seq(configProps1, configProps2))
@ -109,6 +110,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
verifyUncleanLeaderElectionEnabled verifyUncleanLeaderElectionEnabled
} }
@Test
def testUncleanLeaderElectionDisabled { def testUncleanLeaderElectionDisabled {
// disable unclean leader election // disable unclean leader election
configProps1.put("unclean.leader.election.enable", String.valueOf(false)) configProps1.put("unclean.leader.election.enable", String.valueOf(false))
@ -121,6 +123,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
verifyUncleanLeaderElectionDisabled verifyUncleanLeaderElectionDisabled
} }
@Test
def testUncleanLeaderElectionEnabledByTopicOverride { def testUncleanLeaderElectionEnabledByTopicOverride {
// disable unclean leader election globally, but enable for our specific test topic // disable unclean leader election globally, but enable for our specific test topic
configProps1.put("unclean.leader.election.enable", String.valueOf(false)) configProps1.put("unclean.leader.election.enable", String.valueOf(false))
@ -136,6 +139,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
verifyUncleanLeaderElectionEnabled verifyUncleanLeaderElectionEnabled
} }
@Test
def testCleanLeaderElectionDisabledByTopicOverride { def testCleanLeaderElectionDisabledByTopicOverride {
// enable unclean leader election globally, but disable for our specific test topic // enable unclean leader election globally, but disable for our specific test topic
configProps1.put("unclean.leader.election.enable", String.valueOf(true)) configProps1.put("unclean.leader.election.enable", String.valueOf(true))
@ -151,6 +155,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
verifyUncleanLeaderElectionDisabled verifyUncleanLeaderElectionDisabled
} }
@Test
def testUncleanLeaderElectionInvalidTopicOverride { def testUncleanLeaderElectionInvalidTopicOverride {
startBrokers(Seq(configProps1)) startBrokers(Seq(configProps1))

View File

@ -29,6 +29,7 @@ import kafka.utils.{Logging, TestUtils}
import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.consumer.{KafkaStream, ConsumerConfig}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.common.MessageStreamsExistException import kafka.common.MessageStreamsExistException
import org.junit.Test
import scala.collection.JavaConversions import scala.collection.JavaConversions
@ -50,6 +51,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeep
val consumer1 = "consumer1" val consumer1 = "consumer1"
val nMessages = 2 val nMessages = 2
@Test
def testBasic() { def testBasic() {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL) requestHandlerLogger.setLevel(Level.FATAL)

View File

@ -21,7 +21,7 @@ import org.junit.Assert._
import kafka.utils.{TestUtils, CoreUtils, ZkUtils} import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.{After, Before} import org.junit.{Test, After, Before}
class AdvertiseBrokerTest extends ZooKeeperTestHarness { class AdvertiseBrokerTest extends ZooKeeperTestHarness {
var server : KafkaServer = null var server : KafkaServer = null
@ -46,7 +46,8 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
CoreUtils.rm(server.config.logDirs) CoreUtils.rm(server.config.logDirs)
super.tearDown() super.tearDown()
} }
@Test
def testBrokerAdvertiseToZK { def testBrokerAdvertiseToZK {
val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId) val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId)
val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get

View File

@ -43,6 +43,7 @@ class HighwatermarkPersistenceTest {
CoreUtils.rm(dir) CoreUtils.rm(dir)
} }
@Test
def testHighWatermarkPersistenceSinglePartition() { def testHighWatermarkPersistenceSinglePartition() {
// mock zkclient // mock zkclient
val zkClient = EasyMock.createMock(classOf[ZkClient]) val zkClient = EasyMock.createMock(classOf[ZkClient])
@ -78,6 +79,7 @@ class HighwatermarkPersistenceTest {
replicaManager.shutdown(false) replicaManager.shutdown(false)
} }
@Test
def testHighWatermarkPersistenceMultiplePartitions() { def testHighWatermarkPersistenceMultiplePartitions() {
val topic1 = "foo1" val topic1 = "foo1"
val topic2 = "foo2" val topic2 = "foo2"

View File

@ -18,7 +18,7 @@ package kafka.server
import java.util.Properties import java.util.Properties
import org.junit.{Before, After} import org.junit.{Test, Before, After}
import collection.mutable.HashMap import collection.mutable.HashMap
import collection.mutable.Map import collection.mutable.Map
import kafka.cluster.{Partition, Replica} import kafka.cluster.{Partition, Replica}
@ -59,6 +59,7 @@ class IsrExpirationTest {
/* /*
* Test the case where a follower is caught up but stops making requests to the leader. Once beyond the configured time limit, it should fall out of ISR * Test the case where a follower is caught up but stops making requests to the leader. Once beyond the configured time limit, it should fall out of ISR
*/ */
@Test
def testIsrExpirationForStuckFollowers() { def testIsrExpirationForStuckFollowers() {
val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
@ -89,6 +90,7 @@ class IsrExpirationTest {
/* /*
* Test the case where a follower never makes a fetch request. It should fall out of ISR because it will be declared stuck * Test the case where a follower never makes a fetch request. It should fall out of ISR because it will be declared stuck
*/ */
@Test
def testIsrExpirationIfNoFetchRequestMade() { def testIsrExpirationIfNoFetchRequestMade() {
val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L
@ -109,6 +111,7 @@ class IsrExpirationTest {
* Test the case where a follower continually makes fetch requests but is unable to catch up. It should fall out of the ISR * Test the case where a follower continually makes fetch requests but is unable to catch up. It should fall out of the ISR
* However, any time it makes a request to the LogEndOffset it should be back in the ISR * However, any time it makes a request to the LogEndOffset it should be back in the ISR
*/ */
@Test
def testIsrExpirationForSlowFollowers() { def testIsrExpirationForSlowFollowers() {
// create leader replica // create leader replica
val log = getLogWithLogEndOffset(15L, 4) val log = getLogWithLogEndOffset(15L, 4)

View File

@ -26,7 +26,7 @@ import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrA
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.{After, Before} import org.junit.{Test, After, Before}
class LeaderElectionTest extends ZooKeeperTestHarness { class LeaderElectionTest extends ZooKeeperTestHarness {
val brokerId1 = 0 val brokerId1 = 0
@ -56,6 +56,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
super.tearDown() super.tearDown()
} }
@Test
def testLeaderElectionAndEpoch { def testLeaderElectionAndEpoch {
// start 2 brokers // start 2 brokers
val topic = "new-topic" val topic = "new-topic"
@ -101,6 +102,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3) assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
} }
@Test
def testLeaderElectionWithStaleControllerEpoch() { def testLeaderElectionWithStaleControllerEpoch() {
// start 2 brokers // start 2 brokers
val topic = "new-topic" val topic = "new-topic"

View File

@ -27,7 +27,7 @@ import kafka.serializer.StringEncoder
import java.io.File import java.io.File
import org.junit.{After, Before} import org.junit.{Test, After, Before}
import org.junit.Assert._ import org.junit.Assert._
class LogRecoveryTest extends ZooKeeperTestHarness { class LogRecoveryTest extends ZooKeeperTestHarness {
@ -97,6 +97,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
super.tearDown() super.tearDown()
} }
@Test
def testHWCheckpointNoFailuresSingleLogSegment { def testHWCheckpointNoFailuresSingleLogSegment {
val numMessages = 2L val numMessages = 2L
sendMessages(numMessages.toInt) sendMessages(numMessages.toInt)
@ -113,6 +114,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
assertEquals(numMessages, followerHW) assertEquals(numMessages, followerHW)
} }
@Test
def testHWCheckpointWithFailuresSingleLogSegment { def testHWCheckpointWithFailuresSingleLogSegment {
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
@ -163,6 +165,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
} }
@Test
def testHWCheckpointNoFailuresMultipleLogSegments { def testHWCheckpointNoFailuresMultipleLogSegments {
sendMessages(20) sendMessages(20)
val hw = 20L val hw = 20L
@ -178,6 +181,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
assertEquals(hw, followerHW) assertEquals(hw, followerHW)
} }
@Test
def testHWCheckpointWithFailuresMultipleLogSegments { def testHWCheckpointWithFailuresMultipleLogSegments {
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)

View File

@ -17,7 +17,7 @@
package kafka.server package kafka.server
import org.junit.{After, Before} import org.junit.{Test, After, Before}
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.producer.KeyedMessage import kafka.producer.KeyedMessage
@ -44,6 +44,7 @@ class ReplicaFetchTest extends ZooKeeperTestHarness {
super.tearDown() super.tearDown()
} }
@Test
def testReplicaFetcherThread() { def testReplicaFetcherThread() {
val partition = 0 val partition = 0
val testMessageList1 = List("test1", "test2", "test3", "test4") val testMessageList1 = List("test1", "test2", "test3", "test4")

View File

@ -159,6 +159,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
.count(isNonDaemonKafkaThread)) .count(isNonDaemonKafkaThread))
} }
@Test
def testConsecutiveShutdown(){ def testConsecutiveShutdown(){
val server = new KafkaServer(config) val server = new KafkaServer(config)
try { try {

View File

@ -23,9 +23,11 @@ import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.junit.Assert._ import org.junit.Assert._
import org.junit.Test
class ServerStartupTest extends ZooKeeperTestHarness { class ServerStartupTest extends ZooKeeperTestHarness {
@Test
def testBrokerCreatesZKChroot { def testBrokerCreatesZKChroot {
val brokerId = 0 val brokerId = 0
val zookeeperChroot = "/kafka-chroot-for-unittest" val zookeeperChroot = "/kafka-chroot-for-unittest"
@ -41,6 +43,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
CoreUtils.rm(server.config.logDirs) CoreUtils.rm(server.config.logDirs)
} }
@Test
def testConflictBrokerRegistration { def testConflictBrokerRegistration {
// Try starting a broker with the a conflicting broker id. // Try starting a broker with the a conflicting broker id.
// This shouldn't affect the existing broker registration. // This shouldn't affect the existing broker registration.

View File

@ -22,7 +22,7 @@ import kafka.cluster.Replica
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import kafka.log.Log import kafka.log.Log
import kafka.message.{MessageSet, ByteBufferMessageSet, Message} import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
import org.junit.{After, Before} import org.junit.{Test, After, Before}
import java.util.{Properties, Collections} import java.util.{Properties, Collections}
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
@ -136,6 +136,7 @@ class SimpleFetchTest {
* *
* This test also verifies counts of fetch requests recorded by the ReplicaManager * This test also verifies counts of fetch requests recorded by the ReplicaManager
*/ */
@Test
def testReadFromLog() { def testReadFromLog() {
val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count(); val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count();
val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count(); val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count();

View File

@ -20,11 +20,12 @@ package kafka.zk
import kafka.consumer.ConsumerConfig import kafka.consumer.ConsumerConfig
import kafka.utils.ZkUtils import kafka.utils.ZkUtils
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.junit.Assert import org.junit.{Test, Assert}
class ZKEphemeralTest extends ZooKeeperTestHarness { class ZKEphemeralTest extends ZooKeeperTestHarness {
var zkSessionTimeoutMs = 1000 var zkSessionTimeoutMs = 1000
@Test
def testEphemeralNodeCleanup = { def testEphemeralNodeCleanup = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)

View File

@ -21,6 +21,7 @@ import kafka.consumer.ConsumerConfig
import kafka.utils.{ZkPath, TestUtils, ZkUtils} import kafka.utils.{ZkPath, TestUtils, ZkUtils}
import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigException
import org.junit.Assert._ import org.junit.Assert._
import org.junit.Test
class ZKPathTest extends ZooKeeperTestHarness { class ZKPathTest extends ZooKeeperTestHarness {
@ -28,6 +29,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
val zkSessionTimeoutMs = 1000 val zkSessionTimeoutMs = 1000
def zkConnectWithInvalidRoot: String = zkConnect + "/ghost" def zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
@Test
def testCreatePersistentPathThrowsException { def testCreatePersistentPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1")) "test", "1"))
@ -43,6 +45,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
} }
} }
@Test
def testCreatePersistentPath { def testCreatePersistentPath {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
@ -56,6 +59,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)) assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
} }
@Test
def testMakeSurePersistsPathExistsThrowsException { def testMakeSurePersistsPathExistsThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1")) "test", "1"))
@ -71,6 +75,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
} }
} }
@Test
def testMakeSurePersistsPathExists { def testMakeSurePersistsPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
@ -84,6 +89,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)) assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
} }
@Test
def testCreateEphemeralPathThrowsException { def testCreateEphemeralPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1")) "test", "1"))
@ -99,6 +105,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
} }
} }
@Test
def testCreateEphemeralPathExists { def testCreateEphemeralPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
@ -112,6 +119,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path)) assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
} }
@Test
def testCreatePersistentSequentialThrowsException { def testCreatePersistentSequentialThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1")) "test", "1"))
@ -127,6 +135,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
} }
} }
@Test
def testCreatePersistentSequentialExists { def testCreatePersistentSequentialExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)