mirror of https://github.com/apache/kafka.git
KAFKA-1782: Follow up - add missing @Test annotations.
This commit is contained in:
parent
c8e62c9818
commit
1dcaf39d48
|
@ -19,7 +19,7 @@ import org.apache.kafka.clients.consumer._
|
|||
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.junit.Assert._
|
||||
import org.junit.Before
|
||||
import org.junit.{Test, Before}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
|
@ -61,6 +61,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
|
|||
TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10)
|
||||
|
||||
/*
|
||||
|
@ -96,6 +97,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
|
|||
scheduler.shutdown()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5)
|
||||
|
||||
def seekAndCommitWithBrokerFailures(numIters: Int) {
|
||||
|
|
|
@ -25,7 +25,7 @@ import kafka.server.KafkaConfig
|
|||
|
||||
import java.util.ArrayList
|
||||
import org.junit.Assert._
|
||||
import org.junit.Before
|
||||
import org.junit.{Test, Before}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import kafka.coordinator.ConsumerCoordinator
|
||||
|
@ -65,6 +65,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
TestUtils.createTopic(this.zkClient, topic, 2, serverCount, this.servers)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testSimpleConsumption() {
|
||||
val numRecords = 10000
|
||||
sendRecords(numRecords)
|
||||
|
@ -86,6 +87,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
awaitCommitCallback(this.consumers(0), commitCallback)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testCommitSpecifiedOffsets() {
|
||||
sendRecords(5, tp)
|
||||
sendRecords(7, tp2)
|
||||
|
@ -116,12 +118,14 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
assertEquals(7, this.consumers(0).committed(tp2))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testAutoOffsetReset() {
|
||||
sendRecords(1)
|
||||
this.consumers(0).subscribe(tp)
|
||||
consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testSeek() {
|
||||
val consumer = this.consumers(0)
|
||||
val totalRecords = 50L
|
||||
|
@ -142,12 +146,14 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testGroupConsumption() {
|
||||
sendRecords(10)
|
||||
this.consumers(0).subscribe(topic)
|
||||
consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testPositionAndCommit() {
|
||||
sendRecords(5)
|
||||
|
||||
|
@ -179,6 +185,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
consumeRecords(this.consumers(1), 1, 5)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testPartitionsFor() {
|
||||
val numParts = 2
|
||||
TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers)
|
||||
|
@ -188,6 +195,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testListTopics() {
|
||||
val numParts = 2
|
||||
val topic1: String = "part-test-topic-1"
|
||||
|
@ -206,6 +214,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
assertEquals(2, topics.get(topic3).size)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testPartitionReassignmentCallback() {
|
||||
val callback = new TestConsumerReassignmentCallback()
|
||||
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
|
||||
|
@ -238,6 +247,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
consumer0.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUnsubscribeTopic() {
|
||||
val callback = new TestConsumerReassignmentCallback()
|
||||
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
|
||||
|
@ -258,6 +268,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testExpandingTopicSubscriptions() {
|
||||
val otherTopic = "other"
|
||||
val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
|
||||
|
@ -276,6 +287,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
}, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
|
||||
}
|
||||
|
||||
@Test
|
||||
def testShrinkingTopicSubscriptions() {
|
||||
val otherTopic = "other"
|
||||
TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers)
|
||||
|
@ -294,6 +306,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
}, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
|
||||
}
|
||||
|
||||
@Test
|
||||
def testPartitionPauseAndResume() {
|
||||
sendRecords(5)
|
||||
this.consumers(0).subscribe(tp)
|
||||
|
@ -305,6 +318,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
|
|||
consumeRecords(this.consumers(0), 5, 5)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testPauseStateNotPreservedByRebalance() {
|
||||
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
|
||||
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
|
||||
|
|
|
@ -25,7 +25,7 @@ import kafka.utils.{ZkUtils, CoreUtils, TestUtils}
|
|||
import kafka.cluster.Broker
|
||||
import kafka.client.ClientUtils
|
||||
import kafka.server.{KafkaConfig, KafkaServer}
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
class AddPartitionsTest extends ZooKeeperTestHarness {
|
||||
var configs: Seq[KafkaConfig] = null
|
||||
|
@ -62,6 +62,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
|
|||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testTopicDoesNotExist {
|
||||
try {
|
||||
AdminUtils.addPartitions(zkClient, "Blah", 1)
|
||||
|
@ -72,6 +73,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testWrongReplicaCount {
|
||||
try {
|
||||
AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2")
|
||||
|
@ -82,6 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testIncrementPartitions {
|
||||
AdminUtils.addPartitions(zkClient, topic1, 3)
|
||||
// wait until leader is elected
|
||||
|
@ -107,6 +110,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
|
|||
assert(replicas.contains(partitionDataForTopic1(1).leader.get))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testManualAssignmentOfReplicas {
|
||||
AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3")
|
||||
// wait until leader is elected
|
||||
|
@ -133,6 +137,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
|
|||
assert(replicas(1).id == 0 || replicas(1).id == 1)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testReplicaPlacement {
|
||||
AdminUtils.addPartitions(zkClient, topic3, 7)
|
||||
|
||||
|
|
|
@ -26,9 +26,11 @@ import kafka.common.TopicAndPartition
|
|||
import kafka.consumer.PartitionAssignorTest.StaticSubscriptionInfo
|
||||
import kafka.consumer.PartitionAssignorTest.Scenario
|
||||
import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
|
||||
import org.junit.Test
|
||||
|
||||
class PartitionAssignorTest extends Logging {
|
||||
|
||||
@Test
|
||||
def testRoundRobinPartitionAssignor() {
|
||||
val assignor = new RoundRobinAssignor
|
||||
|
||||
|
@ -52,6 +54,7 @@ class PartitionAssignorTest extends Logging {
|
|||
})
|
||||
}
|
||||
|
||||
@Test
|
||||
def testRangePartitionAssignor() {
|
||||
val assignor = new RangeAssignor
|
||||
(1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
|
||||
|
|
|
@ -30,7 +30,7 @@ import kafka.utils.TestUtils._
|
|||
import kafka.utils._
|
||||
import org.I0Itec.zkclient.ZkClient
|
||||
import org.apache.log4j.{Level, Logger}
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
import scala.collection._
|
||||
|
||||
|
@ -65,6 +65,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
|
|||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testBasic() {
|
||||
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
|
||||
requestHandlerLogger.setLevel(Level.FATAL)
|
||||
|
@ -175,7 +176,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
|
|||
requestHandlerLogger.setLevel(Level.ERROR)
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
def testCompression() {
|
||||
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
|
||||
requestHandlerLogger.setLevel(Level.FATAL)
|
||||
|
@ -255,6 +256,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
|
|||
requestHandlerLogger.setLevel(Level.ERROR)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testCompressionSetConsumption() {
|
||||
// send some messages to each broker
|
||||
val sentMessages = sendMessages(servers, topic, 200, 0, DefaultCompressionCodec) ++
|
||||
|
@ -278,6 +280,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
|
|||
zkConsumerConnector1.shutdown
|
||||
}
|
||||
|
||||
@Test
|
||||
def testConsumerDecoder() {
|
||||
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
|
||||
requestHandlerLogger.setLevel(Level.FATAL)
|
||||
|
@ -317,6 +320,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
|
|||
requestHandlerLogger.setLevel(Level.ERROR)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLeaderSelectionForPartition() {
|
||||
val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000)
|
||||
|
||||
|
@ -348,6 +352,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
|
|||
zkClient.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testConsumerRebalanceListener() {
|
||||
// Send messages to create topic
|
||||
sendMessages(servers, topic, nMessages, 0)
|
||||
|
|
|
@ -66,6 +66,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
|
|||
* See @link{https://issues.apache.org/jira/browse/KAFKA-2300}
|
||||
* for the background of this test case
|
||||
*/
|
||||
@Test
|
||||
def testMetadataUpdate() {
|
||||
log.setLevel(Level.INFO)
|
||||
var controller: KafkaServer = this.servers.head;
|
||||
|
|
|
@ -19,7 +19,7 @@ package kafka.integration
|
|||
|
||||
import java.util.concurrent._
|
||||
import java.util.concurrent.atomic._
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
import scala.collection._
|
||||
import org.junit.Assert._
|
||||
|
@ -65,6 +65,7 @@ class FetcherTest extends KafkaServerTestHarness {
|
|||
super.tearDown
|
||||
}
|
||||
|
||||
@Test
|
||||
def testFetcher() {
|
||||
val perNode = 2
|
||||
var count = TestUtils.sendMessages(servers, topic, perNode).size
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Properties
|
|||
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.TestUtils
|
||||
import org.junit.Test
|
||||
|
||||
class MinIsrConfigTest extends KafkaServerTestHarness {
|
||||
|
||||
|
@ -28,6 +29,7 @@ class MinIsrConfigTest extends KafkaServerTestHarness {
|
|||
overridingProps.put(KafkaConfig.MinInSyncReplicasProp, "5")
|
||||
def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
|
||||
|
||||
@Test
|
||||
def testDefaultKafkaConfig() {
|
||||
assert(servers.head.getLogManager().defaultConfig.minInSyncReplicas == 5)
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import kafka.server.{KafkaRequestHandler, KafkaConfig}
|
|||
import kafka.producer.{KeyedMessage, Producer}
|
||||
import org.apache.log4j.{Level, Logger}
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.junit.Test
|
||||
import scala.collection._
|
||||
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
|
||||
import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils}
|
||||
|
@ -38,6 +39,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
|
|||
|
||||
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
|
||||
|
||||
@Test
|
||||
def testFetchRequestCanProperlySerialize() {
|
||||
val request = new FetchRequestBuilder()
|
||||
.clientId("test-client")
|
||||
|
@ -54,6 +56,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
|
|||
assertEquals(request, deserializedRequest)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testEmptyFetchRequest() {
|
||||
val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]()
|
||||
val request = new FetchRequest(requestInfo = partitionRequests)
|
||||
|
@ -61,6 +64,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
|
|||
assertTrue(!fetched.hasError && fetched.data.size == 0)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testDefaultEncoderProducerAndFetch() {
|
||||
val topic = "test-topic"
|
||||
|
||||
|
@ -84,6 +88,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
|
|||
assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testDefaultEncoderProducerAndFetchWithCompression() {
|
||||
val topic = "test-topic"
|
||||
val props = new Properties()
|
||||
|
@ -170,6 +175,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
|
|||
requestHandlerLogger.setLevel(Level.ERROR)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testProduceAndMultiFetch() {
|
||||
produceAndMultiFetch(producer)
|
||||
}
|
||||
|
@ -196,10 +202,12 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMultiProduce() {
|
||||
multiProduce(producer)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testConsumerEmptyTopic() {
|
||||
val newTopic = "new-topic"
|
||||
TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
|
||||
|
@ -208,6 +216,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHar
|
|||
assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testPipelinedProduceRequests() {
|
||||
val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
|
||||
topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package kafka.integration
|
||||
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.{Test, After, Before}
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import kafka.utils.TestUtils._
|
||||
import org.junit.Assert._
|
||||
|
@ -47,6 +47,7 @@ class RollingBounceTest extends ZooKeeperTestHarness {
|
|||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testRollingBounce {
|
||||
// start all the brokers
|
||||
val topic1 = "new-topic1"
|
||||
|
|
|
@ -30,7 +30,7 @@ import kafka.utils.TestUtils._
|
|||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.junit.Assert._
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
class TopicMetadataTest extends ZooKeeperTestHarness {
|
||||
private var server1: KafkaServer = null
|
||||
|
@ -54,6 +54,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
|
|||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testTopicMetadataRequest {
|
||||
// create topic
|
||||
val topic = "test"
|
||||
|
@ -70,6 +71,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
|
|||
assertEquals(topicMetadataRequest, deserializedMetadataRequest)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testBasicTopicMetadata {
|
||||
// create topic
|
||||
val topic = "test"
|
||||
|
@ -87,6 +89,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
|
|||
assertEquals(1, partitionMetadata.head.replicas.size)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testGetAllTopicMetadata {
|
||||
// create topic
|
||||
val topic1 = "testGetAllTopicMetadata1"
|
||||
|
@ -111,6 +114,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
|
|||
assertEquals(1, partitionMetadataTopic2.head.replicas.size)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testAutoCreateTopic {
|
||||
// auto create topic
|
||||
val topic = "testAutoCreateTopic"
|
||||
|
@ -137,6 +141,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
|
|||
assertTrue(partitionMetadata.head.leader.isDefined)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testAutoCreateTopicWithCollision {
|
||||
// auto create topic
|
||||
val topic1 = "testAutoCreate_Topic"
|
||||
|
@ -199,7 +204,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
|
|||
})
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
def testIsrAfterBrokerShutDownAndJoinsBack {
|
||||
val numBrokers = 2 //just 2 brokers are enough for the test
|
||||
|
||||
|
@ -250,10 +255,12 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
def testAliveBrokerListWithNoTopics {
|
||||
checkMetadata(Seq(server1), 1)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
|
||||
var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
|
||||
|
||||
|
@ -267,6 +274,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
|
||||
val adHocServers = adHocConfigs.map(p => createServer(p))
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package kafka.integration
|
||||
|
||||
import org.apache.kafka.common.config.ConfigException
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
import scala.util.Random
|
||||
import org.apache.log4j.{Level, Logger}
|
||||
|
@ -99,6 +99,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUncleanLeaderElectionEnabled {
|
||||
// unclean leader election is enabled by default
|
||||
startBrokers(Seq(configProps1, configProps2))
|
||||
|
@ -109,6 +110,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|||
verifyUncleanLeaderElectionEnabled
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUncleanLeaderElectionDisabled {
|
||||
// disable unclean leader election
|
||||
configProps1.put("unclean.leader.election.enable", String.valueOf(false))
|
||||
|
@ -121,6 +123,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|||
verifyUncleanLeaderElectionDisabled
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUncleanLeaderElectionEnabledByTopicOverride {
|
||||
// disable unclean leader election globally, but enable for our specific test topic
|
||||
configProps1.put("unclean.leader.election.enable", String.valueOf(false))
|
||||
|
@ -136,6 +139,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|||
verifyUncleanLeaderElectionEnabled
|
||||
}
|
||||
|
||||
@Test
|
||||
def testCleanLeaderElectionDisabledByTopicOverride {
|
||||
// enable unclean leader election globally, but disable for our specific test topic
|
||||
configProps1.put("unclean.leader.election.enable", String.valueOf(true))
|
||||
|
@ -151,6 +155,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
|
|||
verifyUncleanLeaderElectionDisabled
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUncleanLeaderElectionInvalidTopicOverride {
|
||||
startBrokers(Seq(configProps1))
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import kafka.utils.{Logging, TestUtils}
|
|||
import kafka.consumer.{KafkaStream, ConsumerConfig}
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import kafka.common.MessageStreamsExistException
|
||||
import org.junit.Test
|
||||
|
||||
import scala.collection.JavaConversions
|
||||
|
||||
|
@ -50,6 +51,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeep
|
|||
val consumer1 = "consumer1"
|
||||
val nMessages = 2
|
||||
|
||||
@Test
|
||||
def testBasic() {
|
||||
val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
|
||||
requestHandlerLogger.setLevel(Level.FATAL)
|
||||
|
|
|
@ -21,7 +21,7 @@ import org.junit.Assert._
|
|||
import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
class AdvertiseBrokerTest extends ZooKeeperTestHarness {
|
||||
var server : KafkaServer = null
|
||||
|
@ -46,7 +46,8 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
|
|||
CoreUtils.rm(server.config.logDirs)
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
def testBrokerAdvertiseToZK {
|
||||
val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId)
|
||||
val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get
|
||||
|
|
|
@ -43,6 +43,7 @@ class HighwatermarkPersistenceTest {
|
|||
CoreUtils.rm(dir)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHighWatermarkPersistenceSinglePartition() {
|
||||
// mock zkclient
|
||||
val zkClient = EasyMock.createMock(classOf[ZkClient])
|
||||
|
@ -78,6 +79,7 @@ class HighwatermarkPersistenceTest {
|
|||
replicaManager.shutdown(false)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHighWatermarkPersistenceMultiplePartitions() {
|
||||
val topic1 = "foo1"
|
||||
val topic2 = "foo2"
|
||||
|
|
|
@ -18,7 +18,7 @@ package kafka.server
|
|||
|
||||
import java.util.Properties
|
||||
|
||||
import org.junit.{Before, After}
|
||||
import org.junit.{Test, Before, After}
|
||||
import collection.mutable.HashMap
|
||||
import collection.mutable.Map
|
||||
import kafka.cluster.{Partition, Replica}
|
||||
|
@ -59,6 +59,7 @@ class IsrExpirationTest {
|
|||
/*
|
||||
* Test the case where a follower is caught up but stops making requests to the leader. Once beyond the configured time limit, it should fall out of ISR
|
||||
*/
|
||||
@Test
|
||||
def testIsrExpirationForStuckFollowers() {
|
||||
val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
|
||||
|
||||
|
@ -89,6 +90,7 @@ class IsrExpirationTest {
|
|||
/*
|
||||
* Test the case where a follower never makes a fetch request. It should fall out of ISR because it will be declared stuck
|
||||
*/
|
||||
@Test
|
||||
def testIsrExpirationIfNoFetchRequestMade() {
|
||||
val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L
|
||||
|
||||
|
@ -109,6 +111,7 @@ class IsrExpirationTest {
|
|||
* Test the case where a follower continually makes fetch requests but is unable to catch up. It should fall out of the ISR
|
||||
* However, any time it makes a request to the LogEndOffset it should be back in the ISR
|
||||
*/
|
||||
@Test
|
||||
def testIsrExpirationForSlowFollowers() {
|
||||
// create leader replica
|
||||
val log = getLogWithLogEndOffset(15L, 4)
|
||||
|
|
|
@ -26,7 +26,7 @@ import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrA
|
|||
import kafka.utils.TestUtils._
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
class LeaderElectionTest extends ZooKeeperTestHarness {
|
||||
val brokerId1 = 0
|
||||
|
@ -56,6 +56,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
|
|||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLeaderElectionAndEpoch {
|
||||
// start 2 brokers
|
||||
val topic = "new-topic"
|
||||
|
@ -101,6 +102,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
|
|||
assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLeaderElectionWithStaleControllerEpoch() {
|
||||
// start 2 brokers
|
||||
val topic = "new-topic"
|
||||
|
|
|
@ -27,7 +27,7 @@ import kafka.serializer.StringEncoder
|
|||
|
||||
import java.io.File
|
||||
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.{Test, After, Before}
|
||||
import org.junit.Assert._
|
||||
|
||||
class LogRecoveryTest extends ZooKeeperTestHarness {
|
||||
|
@ -97,6 +97,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHWCheckpointNoFailuresSingleLogSegment {
|
||||
val numMessages = 2L
|
||||
sendMessages(numMessages.toInt)
|
||||
|
@ -113,6 +114,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
assertEquals(numMessages, followerHW)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHWCheckpointWithFailuresSingleLogSegment {
|
||||
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
|
||||
|
||||
|
@ -163,6 +165,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHWCheckpointNoFailuresMultipleLogSegments {
|
||||
sendMessages(20)
|
||||
val hw = 20L
|
||||
|
@ -178,6 +181,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
|
|||
assertEquals(hw, followerHW)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testHWCheckpointWithFailuresMultipleLogSegments {
|
||||
var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package kafka.server
|
||||
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.{Test, After, Before}
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import kafka.utils.TestUtils._
|
||||
import kafka.producer.KeyedMessage
|
||||
|
@ -44,6 +44,7 @@ class ReplicaFetchTest extends ZooKeeperTestHarness {
|
|||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testReplicaFetcherThread() {
|
||||
val partition = 0
|
||||
val testMessageList1 = List("test1", "test2", "test3", "test4")
|
||||
|
|
|
@ -159,6 +159,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
|
|||
.count(isNonDaemonKafkaThread))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testConsecutiveShutdown(){
|
||||
val server = new KafkaServer(config)
|
||||
try {
|
||||
|
|
|
@ -23,9 +23,11 @@ import kafka.utils.TestUtils
|
|||
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.junit.Assert._
|
||||
import org.junit.Test
|
||||
|
||||
class ServerStartupTest extends ZooKeeperTestHarness {
|
||||
|
||||
@Test
|
||||
def testBrokerCreatesZKChroot {
|
||||
val brokerId = 0
|
||||
val zookeeperChroot = "/kafka-chroot-for-unittest"
|
||||
|
@ -41,6 +43,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
|
|||
CoreUtils.rm(server.config.logDirs)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testConflictBrokerRegistration {
|
||||
// Try starting a broker with the a conflicting broker id.
|
||||
// This shouldn't affect the existing broker registration.
|
||||
|
|
|
@ -22,7 +22,7 @@ import kafka.cluster.Replica
|
|||
import kafka.common.TopicAndPartition
|
||||
import kafka.log.Log
|
||||
import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
|
||||
import org.junit.{After, Before}
|
||||
import org.junit.{Test, After, Before}
|
||||
|
||||
import java.util.{Properties, Collections}
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
@ -136,6 +136,7 @@ class SimpleFetchTest {
|
|||
*
|
||||
* This test also verifies counts of fetch requests recorded by the ReplicaManager
|
||||
*/
|
||||
@Test
|
||||
def testReadFromLog() {
|
||||
val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count();
|
||||
val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count();
|
||||
|
|
|
@ -20,11 +20,12 @@ package kafka.zk
|
|||
import kafka.consumer.ConsumerConfig
|
||||
import kafka.utils.ZkUtils
|
||||
import kafka.utils.TestUtils
|
||||
import org.junit.Assert
|
||||
import org.junit.{Test, Assert}
|
||||
|
||||
class ZKEphemeralTest extends ZooKeeperTestHarness {
|
||||
var zkSessionTimeoutMs = 1000
|
||||
|
||||
@Test
|
||||
def testEphemeralNodeCleanup = {
|
||||
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
|
||||
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
|
||||
|
|
|
@ -21,6 +21,7 @@ import kafka.consumer.ConsumerConfig
|
|||
import kafka.utils.{ZkPath, TestUtils, ZkUtils}
|
||||
import org.apache.kafka.common.config.ConfigException
|
||||
import org.junit.Assert._
|
||||
import org.junit.Test
|
||||
|
||||
class ZKPathTest extends ZooKeeperTestHarness {
|
||||
|
||||
|
@ -28,6 +29,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
|
|||
val zkSessionTimeoutMs = 1000
|
||||
def zkConnectWithInvalidRoot: String = zkConnect + "/ghost"
|
||||
|
||||
@Test
|
||||
def testCreatePersistentPathThrowsException {
|
||||
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
|
||||
"test", "1"))
|
||||
|
@ -43,6 +45,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testCreatePersistentPath {
|
||||
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
|
||||
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
|
||||
|
@ -56,6 +59,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
|
|||
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMakeSurePersistsPathExistsThrowsException {
|
||||
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
|
||||
"test", "1"))
|
||||
|
@ -71,6 +75,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMakeSurePersistsPathExists {
|
||||
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
|
||||
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
|
||||
|
@ -84,6 +89,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
|
|||
assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testCreateEphemeralPathThrowsException {
|
||||
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
|
||||
"test", "1"))
|
||||
|
@ -99,6 +105,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testCreateEphemeralPathExists {
|
||||
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
|
||||
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
|
||||
|
@ -112,6 +119,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
|
|||
assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testCreatePersistentSequentialThrowsException {
|
||||
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
|
||||
"test", "1"))
|
||||
|
@ -127,6 +135,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testCreatePersistentSequentialExists {
|
||||
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
|
||||
var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
|
||||
|
|
Loading…
Reference in New Issue