diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index cf65f1214b9..2c6ee232f46 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -19,7 +19,7 @@ import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.junit.Assert._ -import org.junit.Before +import org.junit.{Test, Before} import scala.collection.JavaConversions._ @@ -61,6 +61,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers) } + @Test def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10) /* @@ -96,6 +97,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { scheduler.shutdown() } + @Test def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5) def seekAndCommitWithBrokerFailures(numIters: Int) { diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index b46070a5f0c..9e8172a07ed 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -25,7 +25,7 @@ import kafka.server.KafkaConfig import java.util.ArrayList import org.junit.Assert._ -import org.junit.Before +import org.junit.{Test, Before} import scala.collection.JavaConverters._ import kafka.coordinator.ConsumerCoordinator @@ -65,6 +65,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { TestUtils.createTopic(this.zkClient, topic, 2, serverCount, this.servers) } + @Test def testSimpleConsumption() { val numRecords = 10000 sendRecords(numRecords) @@ -86,6 +87,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { awaitCommitCallback(this.consumers(0), commitCallback) } + @Test def testCommitSpecifiedOffsets() { sendRecords(5, tp) sendRecords(7, tp2) @@ -116,12 +118,14 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertEquals(7, this.consumers(0).committed(tp2)) } + @Test def testAutoOffsetReset() { sendRecords(1) this.consumers(0).subscribe(tp) consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0) } + @Test def testSeek() { val consumer = this.consumers(0) val totalRecords = 50L @@ -142,12 +146,14 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt) } + @Test def testGroupConsumption() { sendRecords(10) this.consumers(0).subscribe(topic) consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0) } + @Test def testPositionAndCommit() { sendRecords(5) @@ -179,6 +185,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumeRecords(this.consumers(1), 1, 5) } + @Test def testPartitionsFor() { val numParts = 2 TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers) @@ -188,6 +195,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertNull(this.consumers(0).partitionsFor("non-exist-topic")) } + @Test def testListTopics() { val numParts = 2 val topic1: String = "part-test-topic-1" @@ -206,6 +214,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertEquals(2, topics.get(topic3).size) } + @Test def testPartitionReassignmentCallback() { val callback = new TestConsumerReassignmentCallback() this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test @@ -238,6 +247,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumer0.close() } + @Test def testUnsubscribeTopic() { val callback = new TestConsumerReassignmentCallback() this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test @@ -258,6 +268,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { } } + @Test def testExpandingTopicSubscriptions() { val otherTopic = "other" val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) @@ -276,6 +287,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}") } + @Test def testShrinkingTopicSubscriptions() { val otherTopic = "other" TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers) @@ -294,6 +306,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}") } + @Test def testPartitionPauseAndResume() { sendRecords(5) this.consumers(0).subscribe(tp) @@ -305,6 +318,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumeRecords(this.consumers(0), 5, 5) } + @Test def testPauseStateNotPreservedByRebalance() { this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30"); diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 08c170bd7a6..05b9a87d691 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -25,7 +25,7 @@ import kafka.utils.{ZkUtils, CoreUtils, TestUtils} import kafka.cluster.Broker import kafka.client.ClientUtils import kafka.server.{KafkaConfig, KafkaServer} -import org.junit.{After, Before} +import org.junit.{Test, After, Before} class AddPartitionsTest extends ZooKeeperTestHarness { var configs: Seq[KafkaConfig] = null @@ -62,6 +62,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { super.tearDown() } + @Test def testTopicDoesNotExist { try { AdminUtils.addPartitions(zkClient, "Blah", 1) @@ -72,6 +73,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { } } + @Test def testWrongReplicaCount { try { AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2") @@ -82,6 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { } } + @Test def testIncrementPartitions { AdminUtils.addPartitions(zkClient, topic1, 3) // wait until leader is elected @@ -107,6 +110,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { assert(replicas.contains(partitionDataForTopic1(1).leader.get)) } + @Test def testManualAssignmentOfReplicas { AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3") // wait until leader is elected @@ -133,6 +137,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { assert(replicas(1).id == 0 || replicas(1).id == 1) } + @Test def testReplicaPlacement { AdminUtils.addPartitions(zkClient, topic3, 7) diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala index c1071b8c84f..dba1afb3c4a 100644 --- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -26,9 +26,11 @@ import kafka.common.TopicAndPartition import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo import kafka.consumer.PartitionAssignorTest.Scenario import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo +import org.junit.Test class PartitionAssignorTest extends Logging { + @Test def testRoundRobinPartitionAssignor() { val assignor = new RoundRobinAssignor @@ -52,6 +54,7 @@ class PartitionAssignorTest extends Logging { }) } + @Test def testRangePartitionAssignor() { val assignor = new RangeAssignor (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => { diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index c851e270c04..cb595426318 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -30,7 +30,7 @@ import kafka.utils.TestUtils._ import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.apache.log4j.{Level, Logger} -import org.junit.{After, Before} +import org.junit.{Test, After, Before} import scala.collection._ @@ -65,6 +65,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging super.tearDown() } + @Test def testBasic() { val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) requestHandlerLogger.setLevel(Level.FATAL) @@ -175,7 +176,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging requestHandlerLogger.setLevel(Level.ERROR) } - + @Test def testCompression() { val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler]) requestHandlerLogger.setLevel(Level.FATAL) @@ -255,6 +256,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging requestHandlerLogger.setLevel(Level.ERROR) } + @Test def testCompressionSetConsumption() { // send some messages to each broker val sentMessages = sendMessages(servers, topic, 200, 0, DefaultCompressionCodec) ++ @@ -278,6 +280,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging zkConsumerConnector1.shutdown } + @Test def testConsumerDecoder() { val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler]) requestHandlerLogger.setLevel(Level.FATAL) @@ -317,6 +320,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging requestHandlerLogger.setLevel(Level.ERROR) } + @Test def testLeaderSelectionForPartition() { val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000) @@ -348,6 +352,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging zkClient.close() } + @Test def testConsumerRebalanceListener() { // Send messages to create topic sendMessages(servers, topic, nMessages, 0) diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala index 206a7c30db1..0e38a180824 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -54,10 +54,12 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect) .map(KafkaConfig.fromProps(_, overridingProps)) + @Before override def setUp() { super.setUp() } + @After override def tearDown() { super.tearDown() } @@ -66,6 +68,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging { * See @link{https://issues.apache.org/jira/browse/KAFKA-2300} * for the background of this test case */ + @Test def testMetadataUpdate() { log.setLevel(Level.INFO) var controller: KafkaServer = this.servers.head; diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 92af0a1b175..c061597a6d0 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -19,7 +19,7 @@ package kafka.integration import java.util.concurrent._ import java.util.concurrent.atomic._ -import org.junit.{After, Before} +import org.junit.{Test, After, Before} import scala.collection._ import org.junit.Assert._ @@ -65,6 +65,7 @@ class FetcherTest extends KafkaServerTestHarness { super.tearDown } + @Test def testFetcher() { val perNode = 2 var count = TestUtils.sendMessages(servers, topic, perNode).size diff --git a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala index 3c1caded054..397760123c7 100644 --- a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala +++ b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala @@ -21,6 +21,7 @@ import java.util.Properties import kafka.server.KafkaConfig import kafka.utils.TestUtils +import org.junit.Test class MinIsrConfigTest extends KafkaServerTestHarness { @@ -28,6 +29,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness { overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5") def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps)) + @Test def testDefaultKafkaConfig() { assert(servers.head.getLogManager().defaultConfig.minInSyncReplicas == 5) } diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index e05d16bfb78..e6f0c543142 100755 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -24,6 +24,7 @@ import kafka.server.{KafkaRequestHandler, KafkaConfig} import kafka.producer.{KeyedMessage, Producer} import org.apache.log4j.{Level, Logger} import kafka.zk.ZooKeeperTestHarness +import org.junit.Test import scala.collection._ import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils} @@ -38,6 +39,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) + @Test def testFetchRequestCanProperlySerialize() { val request = new FetchRequestBuilder() .clientId("test-client") @@ -54,6 +56,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar assertEquals(request, deserializedRequest) } + @Test def testEmptyFetchRequest() { val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]() val request = new FetchRequest(requestInfo = partitionRequests) @@ -61,6 +64,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar assertTrue(!fetched.hasError && fetched.data.size == 0) } + @Test def testDefaultEncoderProducerAndFetch() { val topic = "test-topic" @@ -84,6 +88,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8")) } + @Test def testDefaultEncoderProducerAndFetchWithCompression() { val topic = "test-topic" val props = new Properties() @@ -170,6 +175,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar requestHandlerLogger.setLevel(Level.ERROR) } + @Test def testProduceAndMultiFetch() { produceAndMultiFetch(producer) } @@ -196,10 +202,12 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar } } + @Test def testMultiProduce() { multiProduce(producer) } + @Test def testConsumerEmptyTopic() { val newTopic = "new-topic" TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers) @@ -208,6 +216,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) } + @Test def testPipelinedProduceRequests() { val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0) topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers)) diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index 2fd10d8afc1..4d73be1e51c 100755 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -17,7 +17,7 @@ package kafka.integration -import org.junit.{After, Before} +import org.junit.{Test, After, Before} import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import org.junit.Assert._ @@ -47,6 +47,7 @@ class RollingBounceTest extends ZooKeeperTestHarness { super.tearDown() } + @Test def testRollingBounce { // start all the brokers val topic1 = "new-topic1" diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 24f0a07d483..5e32d5910c4 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -30,7 +30,7 @@ import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Assert._ -import org.junit.{After, Before} +import org.junit.{Test, After, Before} class TopicMetadataTest extends ZooKeeperTestHarness { private var server1: KafkaServer = null @@ -54,6 +54,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { super.tearDown() } + @Test def testTopicMetadataRequest { // create topic val topic = "test" @@ -70,6 +71,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { assertEquals(topicMetadataRequest, deserializedMetadataRequest) } + @Test def testBasicTopicMetadata { // create topic val topic = "test" @@ -87,6 +89,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { assertEquals(1, partitionMetadata.head.replicas.size) } + @Test def testGetAllTopicMetadata { // create topic val topic1 = "testGetAllTopicMetadata1" @@ -111,6 +114,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { assertEquals(1, partitionMetadataTopic2.head.replicas.size) } + @Test def testAutoCreateTopic { // auto create topic val topic = "testAutoCreateTopic" @@ -137,6 +141,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { assertTrue(partitionMetadata.head.leader.isDefined) } + @Test def testAutoCreateTopicWithCollision { // auto create topic val topic1 = "testAutoCreate_Topic" @@ -199,7 +204,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { }) } - + @Test def testIsrAfterBrokerShutDownAndJoinsBack { val numBrokers = 2 //just 2 brokers are enough for the test @@ -250,10 +255,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness { } + @Test def testAliveBrokerListWithNoTopics { checkMetadata(Seq(server1), 1) } + @Test def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup { var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p)) @@ -267,6 +274,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness { } + @Test def testAliveBrokersListWithNoTopicsAfterABrokerShutdown { val adHocServers = adHocConfigs.map(p => createServer(p)) diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 28f6cc3b58f..4dba7dce57e 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -18,7 +18,7 @@ package kafka.integration import org.apache.kafka.common.config.ConfigException -import org.junit.{After, Before} +import org.junit.{Test, After, Before} import scala.util.Random import org.apache.log4j.{Level, Logger} @@ -99,6 +99,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { } } + @Test def testUncleanLeaderElectionEnabled { // unclean leader election is enabled by default startBrokers(Seq(configProps1, configProps2)) @@ -109,6 +110,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { verifyUncleanLeaderElectionEnabled } + @Test def testUncleanLeaderElectionDisabled { // disable unclean leader election configProps1.put("unclean.leader.election.enable", String.valueOf(false)) @@ -121,6 +123,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { verifyUncleanLeaderElectionDisabled } + @Test def testUncleanLeaderElectionEnabledByTopicOverride { // disable unclean leader election globally, but enable for our specific test topic configProps1.put("unclean.leader.election.enable", String.valueOf(false)) @@ -136,6 +139,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { verifyUncleanLeaderElectionEnabled } + @Test def testCleanLeaderElectionDisabledByTopicOverride { // enable unclean leader election globally, but disable for our specific test topic configProps1.put("unclean.leader.election.enable", String.valueOf(true)) @@ -151,6 +155,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { verifyUncleanLeaderElectionDisabled } + @Test def testUncleanLeaderElectionInvalidTopicOverride { startBrokers(Seq(configProps1)) diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index cf6b9a92486..f4e01270321 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -29,6 +29,7 @@ import kafka.utils.{Logging, TestUtils} import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.zk.ZooKeeperTestHarness import kafka.common.MessageStreamsExistException +import org.junit.Test import scala.collection.JavaConversions @@ -50,6 +51,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeep val consumer1 = "consumer1" val nMessages = 2 + @Test def testBasic() { val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) requestHandlerLogger.setLevel(Level.FATAL) diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala index 066f506df5f..7b55f79b3bf 100755 --- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -21,7 +21,7 @@ import org.junit.Assert._ import kafka.utils.{TestUtils, CoreUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.SecurityProtocol -import org.junit.{After, Before} +import org.junit.{Test, After, Before} class AdvertiseBrokerTest extends ZooKeeperTestHarness { var server : KafkaServer = null @@ -46,7 +46,8 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness { CoreUtils.rm(server.config.logDirs) super.tearDown() } - + + @Test def testBrokerAdvertiseToZK { val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId) val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 7f55a80453d..0c6d23d1b72 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -43,6 +43,7 @@ class HighwatermarkPersistenceTest { CoreUtils.rm(dir) } + @Test def testHighWatermarkPersistenceSinglePartition() { // mock zkclient val zkClient = EasyMock.createMock(classOf[ZkClient]) @@ -78,6 +79,7 @@ class HighwatermarkPersistenceTest { replicaManager.shutdown(false) } + @Test def testHighWatermarkPersistenceMultiplePartitions() { val topic1 = "foo1" val topic2 = "foo2" diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 25f0d4194f8..977b29a18f5 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -18,7 +18,7 @@ package kafka.server import java.util.Properties -import org.junit.{Before, After} +import org.junit.{Test, Before, After} import collection.mutable.HashMap import collection.mutable.Map import kafka.cluster.{Partition, Replica} @@ -59,6 +59,7 @@ class IsrExpirationTest { /* * Test the case where a follower is caught up but stops making requests to the leader. Once beyond the configured time limit, it should fall out of ISR */ + @Test def testIsrExpirationForStuckFollowers() { val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L @@ -89,6 +90,7 @@ class IsrExpirationTest { /* * Test the case where a follower never makes a fetch request. It should fall out of ISR because it will be declared stuck */ + @Test def testIsrExpirationIfNoFetchRequestMade() { val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L @@ -109,6 +111,7 @@ class IsrExpirationTest { * Test the case where a follower continually makes fetch requests but is unable to catch up. It should fall out of the ISR * However, any time it makes a request to the LogEndOffset it should be back in the ISR */ + @Test def testIsrExpirationForSlowFollowers() { // create leader replica val log = getLogWithLogEndOffset(15L, 4) diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index f77f1860c28..bb12a50f6c9 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -26,7 +26,7 @@ import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrA import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.SecurityProtocol -import org.junit.{After, Before} +import org.junit.{Test, After, Before} class LeaderElectionTest extends ZooKeeperTestHarness { val brokerId1 = 0 @@ -56,6 +56,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { super.tearDown() } + @Test def testLeaderElectionAndEpoch { // start 2 brokers val topic = "new-topic" @@ -101,6 +102,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3) } + @Test def testLeaderElectionWithStaleControllerEpoch() { // start 2 brokers val topic = "new-topic" diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 7a0d0b2e3ff..46829b818ac 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -27,7 +27,7 @@ import kafka.serializer.StringEncoder import java.io.File -import org.junit.{After, Before} +import org.junit.{Test, After, Before} import org.junit.Assert._ class LogRecoveryTest extends ZooKeeperTestHarness { @@ -97,6 +97,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { super.tearDown() } + @Test def testHWCheckpointNoFailuresSingleLogSegment { val numMessages = 2L sendMessages(numMessages.toInt) @@ -113,6 +114,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { assertEquals(numMessages, followerHW) } + @Test def testHWCheckpointWithFailuresSingleLogSegment { var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) @@ -163,6 +165,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) } + @Test def testHWCheckpointNoFailuresMultipleLogSegments { sendMessages(20) val hw = 20L @@ -178,6 +181,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { assertEquals(hw, followerHW) } + @Test def testHWCheckpointWithFailuresMultipleLogSegments { var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index dead087e76e..e40bf3b906f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -17,7 +17,7 @@ package kafka.server -import org.junit.{After, Before} +import org.junit.{Test, After, Before} import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import kafka.producer.KeyedMessage @@ -44,6 +44,7 @@ class ReplicaFetchTest extends ZooKeeperTestHarness { super.tearDown() } + @Test def testReplicaFetcherThread() { val partition = 0 val testMessageList1 = List("test1", "test2", "test3", "test4") diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 2a8da0c128b..102dba95290 100755 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -159,6 +159,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness { .count(isNonDaemonKafkaThread)) } + @Test def testConsecutiveShutdown(){ val server = new KafkaServer(config) try { diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 7d986ad6dd7..0adc0aa3942 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -23,9 +23,11 @@ import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness import org.junit.Assert._ +import org.junit.Test class ServerStartupTest extends ZooKeeperTestHarness { + @Test def testBrokerCreatesZKChroot { val brokerId = 0 val zookeeperChroot = "/kafka-chroot-for-unittest" @@ -41,6 +43,7 @@ class ServerStartupTest extends ZooKeeperTestHarness { CoreUtils.rm(server.config.logDirs) } + @Test def testConflictBrokerRegistration { // Try starting a broker with the a conflicting broker id. // This shouldn't affect the existing broker registration. diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index d950665a64d..ba584a2bcf5 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -22,7 +22,7 @@ import kafka.cluster.Replica import kafka.common.TopicAndPartition import kafka.log.Log import kafka.message.{MessageSet, ByteBufferMessageSet, Message} -import org.junit.{After, Before} +import org.junit.{Test, After, Before} import java.util.{Properties, Collections} import java.util.concurrent.atomic.AtomicBoolean @@ -136,6 +136,7 @@ class SimpleFetchTest { * * This test also verifies counts of fetch requests recorded by the ReplicaManager */ + @Test def testReadFromLog() { val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count(); val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count(); diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala index 247aa6e4a7c..f240e89b67d 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala @@ -20,11 +20,12 @@ package kafka.zk import kafka.consumer.ConsumerConfig import kafka.utils.ZkUtils import kafka.utils.TestUtils -import org.junit.Assert +import org.junit.{Test, Assert} class ZKEphemeralTest extends ZooKeeperTestHarness { var zkSessionTimeoutMs = 1000 + @Test def testEphemeralNodeCleanup = { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala index 35c635a10ba..241eea51d91 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala @@ -21,6 +21,7 @@ import kafka.consumer.ConsumerConfig import kafka.utils.{ZkPath, TestUtils, ZkUtils} import org.apache.kafka.common.config.ConfigException import org.junit.Assert._ +import org.junit.Test class ZKPathTest extends ZooKeeperTestHarness { @@ -28,6 +29,7 @@ class ZKPathTest extends ZooKeeperTestHarness { val zkSessionTimeoutMs = 1000 def zkConnectWithInvalidRoot: String = zkConnect + "/ghost" + @Test def testCreatePersistentPathThrowsException { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) @@ -43,6 +45,7 @@ class ZKPathTest extends ZooKeeperTestHarness { } } + @Test def testCreatePersistentPath { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) @@ -56,6 +59,7 @@ class ZKPathTest extends ZooKeeperTestHarness { assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)) } + @Test def testMakeSurePersistsPathExistsThrowsException { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) @@ -71,6 +75,7 @@ class ZKPathTest extends ZooKeeperTestHarness { } } + @Test def testMakeSurePersistsPathExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) @@ -84,6 +89,7 @@ class ZKPathTest extends ZooKeeperTestHarness { assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)) } + @Test def testCreateEphemeralPathThrowsException { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) @@ -99,6 +105,7 @@ class ZKPathTest extends ZooKeeperTestHarness { } } + @Test def testCreateEphemeralPathExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) @@ -112,6 +119,7 @@ class ZKPathTest extends ZooKeeperTestHarness { assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path)) } + @Test def testCreatePersistentSequentialThrowsException { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) @@ -127,6 +135,7 @@ class ZKPathTest extends ZooKeeperTestHarness { } } + @Test def testCreatePersistentSequentialExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)