KAFKA-1782: Follow up - add missing @Test annotations.

This commit is contained in:
Ewen Cheslack-Postava 2015-08-14 15:43:56 -07:00
parent c8e62c9818
commit 1dcaf39d48
24 changed files with 104 additions and 18 deletions

View File

@ -19,7 +19,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
import org.junit.Before
import org.junit.{Test, Before}
import scala.collection.JavaConversions._
@ -61,6 +61,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
}
@Test
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10)
/*
@ -96,6 +97,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
scheduler.shutdown()
}
@Test
def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5)
def seekAndCommitWithBrokerFailures(numIters: Int) {

View File

@ -25,7 +25,7 @@ import kafka.server.KafkaConfig
import java.util.ArrayList
import org.junit.Assert._
import org.junit.Before
import org.junit.{Test, Before}
import scala.collection.JavaConverters._
import kafka.coordinator.ConsumerCoordinator
@ -65,6 +65,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
TestUtils.createTopic(this.zkClient, topic, 2, serverCount, this.servers)
}
@Test
def testSimpleConsumption() {
val numRecords = 10000
sendRecords(numRecords)
@ -86,6 +87,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
awaitCommitCallback(this.consumers(0), commitCallback)
}
@Test
def testCommitSpecifiedOffsets() {
sendRecords(5, tp)
sendRecords(7, tp2)
@ -116,12 +118,14 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertEquals(7, this.consumers(0).committed(tp2))
}
@Test
def testAutoOffsetReset() {
sendRecords(1)
this.consumers(0).subscribe(tp)
consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
}
@Test
def testSeek() {
val consumer = this.consumers(0)
val totalRecords = 50L
@ -142,12 +146,14 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt)
}
@Test
def testGroupConsumption() {
sendRecords(10)
this.consumers(0).subscribe(topic)
consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
}
@Test
def testPositionAndCommit() {
sendRecords(5)
@ -179,6 +185,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumeRecords(this.consumers(1), 1, 5)
}
@Test
def testPartitionsFor() {
val numParts = 2
TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers)
@ -188,6 +195,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
}
@Test
def testListTopics() {
val numParts = 2
val topic1: String = "part-test-topic-1"
@ -206,6 +214,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertEquals(2, topics.get(topic3).size)
}
@Test
def testPartitionReassignmentCallback() {
val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
@ -238,6 +247,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumer0.close()
}
@Test
def testUnsubscribeTopic() {
val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
@ -258,6 +268,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
}
}
@Test
def testExpandingTopicSubscriptions() {
val otherTopic = "other"
val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
@ -276,6 +287,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
}, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
}
@Test
def testShrinkingTopicSubscriptions() {
val otherTopic = "other"
TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers)
@ -294,6 +306,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
}, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
}
@Test
def testPartitionPauseAndResume() {
sendRecords(5)
this.consumers(0).subscribe(tp)
@ -305,6 +318,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumeRecords(this.consumers(0), 5, 5)
}
@Test
def testPauseStateNotPreservedByRebalance() {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");

View File

@ -25,7 +25,7 @@ import kafka.utils.{ZkUtils, CoreUtils, TestUtils}
import kafka.cluster.Broker
import kafka.client.ClientUtils
import kafka.server.{KafkaConfig, KafkaServer}
import org.junit.{After, Before}
import org.junit.{Test, After, Before}
class AddPartitionsTest extends ZooKeeperTestHarness {
var configs: Seq[KafkaConfig] = null
@ -62,6 +62,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
super.tearDown()
}
@Test
def testTopicDoesNotExist {
try {
AdminUtils.addPartitions(zkClient, "Blah", 1)
@ -72,6 +73,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
}
@Test
def testWrongReplicaCount {
try {
AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2")
@ -82,6 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
}
}
@Test
def testIncrementPartitions {
AdminUtils.addPartitions(zkClient, topic1, 3)
// wait until leader is elected
@ -107,6 +110,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
assert(replicas.contains(partitionDataForTopic1(1).leader.get))
}
@Test
def testManualAssignmentOfReplicas {
AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3")
// wait until leader is elected
@ -133,6 +137,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
assert(replicas(1).id == 0 || replicas(1).id == 1)
}
@Test
def testReplicaPlacement {
AdminUtils.addPartitions(zkClient, topic3, 7)

View File

@ -26,9 +26,11 @@ import kafka.common.TopicAndPartition
import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
import kafka.consumer.PartitionAssignorTest.Scenario
import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
import org.junit.Test
class PartitionAssignorTest extends Logging {
@Test
def testRoundRobinPartitionAssignor() {
val assignor = new RoundRobinAssignor
@ -52,6 +54,7 @@ class PartitionAssignorTest extends Logging {
})
}
@Test
def testRangePartitionAssignor() {
val assignor = new RangeAssignor
(1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {

View File

@ -30,7 +30,7 @@ import kafka.utils.TestUtils._
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.apache.log4j.{Level, Logger}
import org.junit.{After, Before}
import org.junit.{Test, After, Before}
import scala.collection._
@ -65,6 +65,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
super.tearDown()
}
@Test
def testBasic() {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
@ -175,7 +176,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
requestHandlerLogger.setLevel(Level.ERROR)
}
@Test
def testCompression() {
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
@ -255,6 +256,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
requestHandlerLogger.setLevel(Level.ERROR)
}
@Test
def testCompressionSetConsumption() {
// send some messages to each broker
val sentMessages = sendMessages(servers, topic, 200, 0, DefaultCompressionCodec) ++
@ -278,6 +280,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
zkConsumerConnector1.shutdown
}
@Test
def testConsumerDecoder() {
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
@ -317,6 +320,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
requestHandlerLogger.setLevel(Level.ERROR)
}
@Test
def testLeaderSelectionForPartition() {
val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000)
@ -348,6 +352,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
zkClient.close()
}
@Test
def testConsumerRebalanceListener() {
// Send messages to create topic
sendMessages(servers, topic, nMessages, 0)

View File

@ -66,6 +66,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
* See @link{https://issues.apache.org/jira/browse/KAFKA-2300}
* for the background of this test case
*/
@Test
def testMetadataUpdate() {
log.setLevel(Level.INFO)
var controller: KafkaServer = this.servers.head;

View File

@ -19,7 +19,7 @@ package kafka.integration
import java.util.concurrent._
import java.util.concurrent.atomic._
import org.junit.{After, Before}
import org.junit.{Test, After, Before}
import scala.collection._
import org.junit.Assert._
@ -65,6 +65,7 @@ class FetcherTest extends KafkaServerTestHarness {
super.tearDown
}
@Test
def testFetcher() {
val perNode = 2
var count = TestUtils.sendMessages(servers, topic, perNode).size

View File

@ -21,6 +21,7 @@ import java.util.Properties
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.junit.Test
class MinIsrConfigTest extends KafkaServerTestHarness {
@ -28,6 +29,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness {
overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5")
def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
@Test
def testDefaultKafkaConfig() {
assert(servers.head.getLogManager().defaultConfig.minInSyncReplicas == 5)
}

View File

@ -24,6 +24,7 @@ import kafka.server.{KafkaRequestHandler, KafkaConfig}
import kafka.producer.{KeyedMessage, Producer}
import org.apache.log4j.{Level, Logger}
import kafka.zk.ZooKeeperTestHarness
import org.junit.Test
import scala.collection._
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils}
@ -38,6 +39,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
@Test
def testFetchRequestCanProperlySerialize() {
val request = new FetchRequestBuilder()
.clientId("test-client")
@ -54,6 +56,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
assertEquals(request, deserializedRequest)
}
@Test
def testEmptyFetchRequest() {
val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]()
val request = new FetchRequest(requestInfo = partitionRequests)
@ -61,6 +64,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
assertTrue(!fetched.hasError && fetched.data.size == 0)
}
@Test
def testDefaultEncoderProducerAndFetch() {
val topic = "test-topic"
@ -84,6 +88,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
}
@Test
def testDefaultEncoderProducerAndFetchWithCompression() {
val topic = "test-topic"
val props = new Properties()
@ -170,6 +175,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
requestHandlerLogger.setLevel(Level.ERROR)
}
@Test
def testProduceAndMultiFetch() {
produceAndMultiFetch(producer)
}
@ -196,10 +202,12 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
}
}
@Test
def testMultiProduce() {
multiProduce(producer)
}
@Test
def testConsumerEmptyTopic() {
val newTopic = "new-topic"
TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
@ -208,6 +216,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
}
@Test
def testPipelinedProduceRequests() {
val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))

View File

@ -17,7 +17,7 @@
package kafka.integration
import org.junit.{After, Before}
import org.junit.{Test, After, Before}
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import org.junit.Assert._
@ -47,6 +47,7 @@ class RollingBounceTest extends ZooKeeperTestHarness {
super.tearDown()
}
@Test
def testRollingBounce {
// start all the brokers
val topic1 = "new-topic1"

View File

@ -30,7 +30,7 @@ import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.Assert._
import org.junit.{After, Before}
import org.junit.{Test, After, Before}
class TopicMetadataTest extends ZooKeeperTestHarness {
private var server1: KafkaServer = null
@ -54,6 +54,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
super.tearDown()
}
@Test
def testTopicMetadataRequest {
// create topic
val topic = "test"
@ -70,6 +71,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
assertEquals(topicMetadataRequest, deserializedMetadataRequest)
}
@Test
def testBasicTopicMetadata {
// create topic
val topic = "test"
@ -87,6 +89,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
assertEquals(1, partitionMetadata.head.replicas.size)
}
@Test
def testGetAllTopicMetadata {
// create topic
val topic1 = "testGetAllTopicMetadata1"
@ -111,6 +114,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
assertEquals(1, partitionMetadataTopic2.head.replicas.size)
}
@Test
def testAutoCreateTopic {
// auto create topic
val topic = "testAutoCreateTopic"
@ -137,6 +141,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
assertTrue(partitionMetadata.head.leader.isDefined)
}
@Test
def testAutoCreateTopicWithCollision {
// auto create topic
val topic1 = "testAutoCreate_Topic"
@ -199,7 +204,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
})
}
@Test
def testIsrAfterBrokerShutDownAndJoinsBack {
val numBrokers = 2 //just 2 brokers are enough for the test
@ -250,10 +255,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
def testAliveBrokerListWithNoTopics {
checkMetadata(Seq(server1), 1)
}
@Test
def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
@ -267,6 +274,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
val adHocServers = adHocConfigs.map(p => createServer(p))

View File

@ -18,7 +18,7 @@
package kafka.integration
import org.apache.kafka.common.config.ConfigException
import org.junit.{After, Before}
import org.junit.{Test, After, Before}
import scala.util.Random
import org.apache.log4j.{Level, Logger}
@ -99,6 +99,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
}
}
@Test
def testUncleanLeaderElectionEnabled {
// unclean leader election is enabled by default
startBrokers(Seq(configProps1, configProps2))
@ -109,6 +110,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
verifyUncleanLeaderElectionEnabled
}
@Test
def testUncleanLeaderElectionDisabled {
// disable unclean leader election
configProps1.put("unclean.leader.election.enable", String.valueOf(false))
@ -121,6 +123,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
verifyUncleanLeaderElectionDisabled
}
@Test
def testUncleanLeaderElectionEnabledByTopicOverride {
// disable unclean leader election globally, but enable for our specific test topic
configProps1.put("unclean.leader.election.enable", String.valueOf(false))
@ -136,6 +139,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
verifyUncleanLeaderElectionEnabled
}
@Test
def testCleanLeaderElectionDisabledByTopicOverride {
// enable unclean leader election globally, but disable for our specific test topic
configProps1.put("unclean.leader.election.enable", String.valueOf(true))
@ -151,6 +155,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
verifyUncleanLeaderElectionDisabled
}
@Test
def testUncleanLeaderElectionInvalidTopicOverride {
startBrokers(Seq(configProps1))

View File

@ -29,6 +29,7 @@ import kafka.utils.{Logging, TestUtils}
import kafka.consumer.{KafkaStream, ConsumerConfig}
import kafka.zk.ZooKeeperTestHarness
import kafka.common.MessageStreamsExistException
import org.junit.Test
import scala.collection.JavaConversions
@ -50,6 +51,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeep
val consumer1 = "consumer1"
val nMessages = 2
@Test
def testBasic() {
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)

View File

@ -21,7 +21,7 @@ import org.junit.Assert._
import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.{After, Before}
import org.junit.{Test, After, Before}
class AdvertiseBrokerTest extends ZooKeeperTestHarness {
var server : KafkaServer = null
@ -46,7 +46,8 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
CoreUtils.rm(server.config.logDirs)
super.tearDown()
}
@Test
def testBrokerAdvertiseToZK {
val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId)
val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get

View File

@ -43,6 +43,7 @@ class HighwatermarkPersistenceTest {
CoreUtils.rm(dir)
}
@Test
def testHighWatermarkPersistenceSinglePartition() {
// mock zkclient
val zkClient = EasyMock.createMock(classOf[ZkClient])
@ -78,6 +79,7 @@ class HighwatermarkPersistenceTest {
replicaManager.shutdown(false)
}
@Test
def testHighWatermarkPersistenceMultiplePartitions() {
val topic1 = "foo1"
val topic2 = "foo2"

View File

@ -18,7 +18,7 @@ package kafka.server
import java.util.Properties
import org.junit.{Before, After}
import org.junit.{Test, Before, After}
import collection.mutable.HashMap
import collection.mutable.Map
import kafka.cluster.{Partition, Replica}
@ -59,6 +59,7 @@ class IsrExpirationTest {
/*
* Test the case where a follower is caught up but stops making requests to the leader. Once beyond the configured time limit, it should fall out of ISR
*/
@Test
def testIsrExpirationForStuckFollowers() {
val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
@ -89,6 +90,7 @@ class IsrExpirationTest {
/*
* Test the case where a follower never makes a fetch request. It should fall out of ISR because it will be declared stuck
*/
@Test
def testIsrExpirationIfNoFetchRequestMade() {
val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L
@ -109,6 +111,7 @@ class IsrExpirationTest {
* Test the case where a follower continually makes fetch requests but is unable to catch up. It should fall out of the ISR
* However, any time it makes a request to the LogEndOffset it should be back in the ISR
*/
@Test
def testIsrExpirationForSlowFollowers() {
// create leader replica
val log = getLogWithLogEndOffset(15L, 4)

View File

@ -26,7 +26,7 @@ import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrA
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.{After, Before}
import org.junit.{Test, After, Before}
class LeaderElectionTest extends ZooKeeperTestHarness {
val brokerId1 = 0
@ -56,6 +56,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
super.tearDown()
}
@Test
def testLeaderElectionAndEpoch {
// start 2 brokers
val topic = "new-topic"
@ -101,6 +102,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
}
@Test
def testLeaderElectionWithStaleControllerEpoch() {
// start 2 brokers
val topic = "new-topic"

View File

@ -27,7 +27,7 @@ import kafka.serializer.StringEncoder
import java.io.File
import org.junit.{After, Before}
import org.junit.{Test, After, Before}
import org.junit.Assert._
class LogRecoveryTest extends ZooKeeperTestHarness {
@ -97,6 +97,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
super.tearDown()
}
@Test
def testHWCheckpointNoFailuresSingleLogSegment {
val numMessages = 2L
sendMessages(numMessages.toInt)
@ -113,6 +114,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
assertEquals(numMessages, followerHW)
}
@Test
def testHWCheckpointWithFailuresSingleLogSegment {
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
@ -163,6 +165,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
}
@Test
def testHWCheckpointNoFailuresMultipleLogSegments {
sendMessages(20)
val hw = 20L
@ -178,6 +181,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
assertEquals(hw, followerHW)
}
@Test
def testHWCheckpointWithFailuresMultipleLogSegments {
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)

View File

@ -17,7 +17,7 @@
package kafka.server
import org.junit.{After, Before}
import org.junit.{Test, After, Before}
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import kafka.producer.KeyedMessage
@ -44,6 +44,7 @@ class ReplicaFetchTest extends ZooKeeperTestHarness {
super.tearDown()
}
@Test
def testReplicaFetcherThread() {
val partition = 0
val testMessageList1 = List("test1", "test2", "test3", "test4")

View File

@ -159,6 +159,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
.count(isNonDaemonKafkaThread))
}
@Test
def testConsecutiveShutdown(){
val server = new KafkaServer(config)
try {

View File

@ -23,9 +23,11 @@ import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.junit.Assert._
import org.junit.Test
class ServerStartupTest extends ZooKeeperTestHarness {
@Test
def testBrokerCreatesZKChroot {
val brokerId = 0
val zookeeperChroot = "/kafka-chroot-for-unittest"
@ -41,6 +43,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
CoreUtils.rm(server.config.logDirs)
}
@Test
def testConflictBrokerRegistration {
// Try starting a broker with the a conflicting broker id.
// This shouldn't affect the existing broker registration.

View File

@ -22,7 +22,7 @@ import kafka.cluster.Replica
import kafka.common.TopicAndPartition
import kafka.log.Log
import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
import org.junit.{After, Before}
import org.junit.{Test, After, Before}
import java.util.{Properties, Collections}
import java.util.concurrent.atomic.AtomicBoolean
@ -136,6 +136,7 @@ class SimpleFetchTest {
*
* This test also verifies counts of fetch requests recorded by the ReplicaManager
*/
@Test
def testReadFromLog() {
val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count();
val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count();

View File

@ -20,11 +20,12 @@ package kafka.zk
import kafka.consumer.ConsumerConfig
import kafka.utils.ZkUtils
import kafka.utils.TestUtils
import org.junit.Assert
import org.junit.{Test, Assert}
class ZKEphemeralTest extends ZooKeeperTestHarness {
var zkSessionTimeoutMs = 1000
@Test
def testEphemeralNodeCleanup = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)

View File

@ -21,6 +21,7 @@ import kafka.consumer.ConsumerConfig
import kafka.utils.{ZkPath, TestUtils, ZkUtils}
import org.apache.kafka.common.config.ConfigException
import org.junit.Assert._
import org.junit.Test
class ZKPathTest extends ZooKeeperTestHarness {
@ -28,6 +29,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
val zkSessionTimeoutMs = 1000
def zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
@Test
def testCreatePersistentPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
@ -43,6 +45,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
}
@Test
def testCreatePersistentPath {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
@ -56,6 +59,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
}
@Test
def testMakeSurePersistsPathExistsThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
@ -71,6 +75,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
}
@Test
def testMakeSurePersistsPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
@ -84,6 +89,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
}
@Test
def testCreateEphemeralPathThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
@ -99,6 +105,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
}
@Test
def testCreateEphemeralPathExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
@ -112,6 +119,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
}
@Test
def testCreatePersistentSequentialThrowsException {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
"test", "1"))
@ -127,6 +135,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
}
}
@Test
def testCreatePersistentSequentialExists {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)