diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 5c72cbf6532..dbb6213332e 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -87,12 +87,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { super.tearDown() } - val serverCount = 3 + val brokerCount = 3 val consumerCount = 1 val producerCount = 1 override def generateConfigs = { - val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), + val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = 2) cfgs.foreach { config => config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}") @@ -197,7 +197,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(3, partition.replicas.size) partition.replicas.asScala.foreach { replica => assertTrue(replica.id >= 0) - assertTrue(replica.id < serverCount) + assertTrue(replica.id < brokerCount) } assertEquals("No duplicate replica ids", partition.replicas.size, partition.replicas.asScala.map(_.id).distinct.size) @@ -301,10 +301,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val topic = "topic" val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1) val partitionsByBroker = leaderByPartition.groupBy { case (partitionId, leaderId) => leaderId }.mapValues(_.keys.toSeq) - val brokers = (0 until serverCount).map(Integer.valueOf) + val brokers = (0 until brokerCount).map(Integer.valueOf) val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get - (0 until serverCount).foreach { brokerId => + (0 until brokerCount).foreach { brokerId => val server = servers.find(_.config.brokerId == brokerId).get val expectedPartitions = partitionsByBroker(brokerId) val logDirInfos = logDirInfosByBroker.get(brokerId) @@ -361,7 +361,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertTrue(exception.getCause.isInstanceOf[UnknownTopicOrPartitionException]) } - createTopic(topic, numPartitions = 1, replicationFactor = serverCount) + createTopic(topic, numPartitions = 1, replicationFactor = brokerCount) servers.foreach { server => val logDir = server.logManager.getLog(tp).get.dir.getParent assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir) @@ -759,7 +759,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @Test def testSeekAfterDeleteRecords(): Unit = { - createTopic(topic, numPartitions = 2, replicationFactor = serverCount) + createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) client = AdminClient.create(createConfig) @@ -788,7 +788,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @Test def testLogStartOffsetCheckpoint(): Unit = { - createTopic(topic, numPartitions = 2, replicationFactor = serverCount) + createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) client = AdminClient.create(createConfig) @@ -801,7 +801,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { var lowWatermark: Option[Long] = Some(result.lowWatermarks.get(topicPartition).get.lowWatermark) assertEquals(Some(5), lowWatermark) - for (i <- 0 until serverCount) { + for (i <- 0 until brokerCount) { killBroker(i) } restartDeadBrokers() @@ -828,7 +828,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @Test def testLogStartOffsetAfterDeleteRecords(): Unit = { - createTopic(topic, numPartitions = 2, replicationFactor = serverCount) + createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) client = AdminClient.create(createConfig) @@ -842,13 +842,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val lowWatermark = result.lowWatermarks.get(topicPartition).get.lowWatermark assertEquals(3L, lowWatermark) - for (i <- 0 until serverCount) + for (i <- 0 until brokerCount) assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset) } @Test def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = { - val leaders = createTopic(topic, numPartitions = 1, replicationFactor = serverCount) + val leaders = createTopic(topic, numPartitions = 1, replicationFactor = brokerCount) val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1 def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = { @@ -881,7 +881,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L) // after the new replica caught up, all replicas should have same log start offset - for (i <- 0 until serverCount) + for (i <- 0 until brokerCount) assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset) // kill the same follower again, produce more records, and delete records beyond follower's LOE @@ -896,7 +896,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @Test def testAlterLogDirsAfterDeleteRecords(): Unit = { client = AdminClient.create(createConfig) - createTopic(topic, numPartitions = 1, replicationFactor = serverCount) + createTopic(topic, numPartitions = 1, replicationFactor = brokerCount) val expectedLEO = 100 val producer = createProducer() sendRecords(producer, expectedLEO, topicPartition) @@ -905,7 +905,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava) result.all().get() // make sure we are in the expected state after delete records - for (i <- 0 until serverCount) { + for (i <- 0 until brokerCount) { assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset) assertEquals(expectedLEO, servers(i).replicaManager.localReplica(topicPartition).get.logEndOffset) } @@ -927,7 +927,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @Test def testOffsetsForTimesAfterDeleteRecords(): Unit = { - createTopic(topic, numPartitions = 2, replicationFactor = serverCount) + createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) client = AdminClient.create(createConfig) @@ -999,7 +999,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @Test def testDescribeConfigsForTopic(): Unit = { - createTopic(topic, numPartitions = 2, replicationFactor = serverCount) + createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) client = AdminClient.create(createConfig) val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index a7cb654b06a..336bfb19982 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -54,7 +54,7 @@ import scala.collection.mutable.Buffer class AuthorizerIntegrationTest extends BaseRequestTest { - override def numBrokers: Int = 1 + override def brokerCount: Int = 1 val brokerId: Integer = 0 def userPrincipal = KafkaPrincipal.ANONYMOUS @@ -103,7 +103,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000") consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group) - override def propertyOverrides(properties: Properties): Unit = { + override def brokerPropertyOverrides(properties: Properties): Unit = { properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName) properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 1645b5badd6..6b8b5023add 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -14,13 +14,14 @@ package kafka.api import java.time.Duration import java.util +import java.util.Properties import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.{PartitionInfo, TopicPartition} import kafka.utils.{ShutdownableThread, TestUtils} -import kafka.server.KafkaConfig +import kafka.server.{BaseRequestTest, KafkaConfig} import org.junit.Assert._ import org.junit.{Before, Test} @@ -30,43 +31,49 @@ import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.internals.Topic +import scala.collection.mutable + /** * Integration tests for the consumer that cover basic usage as well as server failures */ -abstract class BaseConsumerTest extends IntegrationTestHarness { +abstract class BaseConsumerTest extends BaseRequestTest { val epsilon = 0.1 - val serverCount = 3 + override def brokerCount: Int = 3 val topic = "topic" val part = 0 val tp = new TopicPartition(topic, part) val part2 = 1 val tp2 = new TopicPartition(topic, part2) + val group = "my-test" val producerClientId = "ConsumerTestProducer" val consumerClientId = "ConsumerTestConsumer" + val groupMaxSessionTimeoutMs = 30000L - // configure the servers and clients - this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown - this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset - this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") - this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout - this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000") - this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "10") this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId) this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId) - this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group) this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100") + override protected def brokerPropertyOverrides(properties: Properties): Unit = { + properties.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown + properties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset + properties.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") + properties.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout + properties.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, groupMaxSessionTimeoutMs.toString) + properties.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "10") + } + @Before override def setUp() { super.setUp() // create the test topic with all the brokers as replicas - createTopic(topic, 2, serverCount) + createTopic(topic, 2, brokerCount) } @Test @@ -90,7 +97,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { @Test def testCoordinatorFailover() { val listener = new TestConsumerReassignmentListener() - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000") + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000") val consumer = createConsumer() @@ -130,6 +137,12 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { } } + protected def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = { + val groupOverrideConfig = new Properties + groupOverrideConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) + createConsumer(configOverrides = groupOverrideConfig) + } + protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int, tp: TopicPartition): Seq[ProducerRecord[Array[Byte], Array[Byte]]] = { val records = (0 until numRecords).map { i => @@ -223,6 +236,100 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { assertEquals(None, commitCallback.error) } + /** + * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding + * pollers for these consumers. Wait for partition re-assignment and validate. + * + * Currently, assignment validation requires that total number of partitions is greater or equal to + * number of consumers, so subscriptions.size must be greater or equal the resulting number of consumers in the group + * + * @param numOfConsumersToAdd number of consumers to create and add to the consumer group + * @param consumerGroup current consumer group + * @param consumerPollers current consumer pollers + * @param topicsToSubscribe topics to which new consumers will subscribe to + * @param subscriptions set of all topic partitions + */ + def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int, + consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], + consumerPollers: mutable.Buffer[ConsumerAssignmentPoller], + topicsToSubscribe: List[String], + subscriptions: Set[TopicPartition], + group: String = group): (mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], mutable.Buffer[ConsumerAssignmentPoller]) = { + assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size) + addConsumersToGroup(numOfConsumersToAdd, consumerGroup, consumerPollers, topicsToSubscribe, subscriptions, group) + // wait until topics get re-assigned and validate assignment + validateGroupAssignment(consumerPollers, subscriptions) + + (consumerGroup, consumerPollers) + } + + /** + * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding + * pollers for these consumers. + * + * + * @param numOfConsumersToAdd number of consumers to create and add to the consumer group + * @param consumerGroup current consumer group + * @param consumerPollers current consumer pollers + * @param topicsToSubscribe topics to which new consumers will subscribe to + * @param subscriptions set of all topic partitions + */ + def addConsumersToGroup(numOfConsumersToAdd: Int, + consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], + consumerPollers: mutable.Buffer[ConsumerAssignmentPoller], + topicsToSubscribe: List[String], + subscriptions: Set[TopicPartition], + group: String = group): (mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], mutable.Buffer[ConsumerAssignmentPoller]) = { + for (_ <- 0 until numOfConsumersToAdd) { + val consumer = createConsumerWithGroupId(group) + consumerGroup += consumer + consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe) + } + + (consumerGroup, consumerPollers) + } + + /** + * Wait for consumers to get partition assignment and validate it. + * + * @param consumerPollers consumer pollers corresponding to the consumer group we are testing + * @param subscriptions set of all topic partitions + * @param msg message to print when waiting for/validating assignment fails + */ + def validateGroupAssignment(consumerPollers: mutable.Buffer[ConsumerAssignmentPoller], + subscriptions: Set[TopicPartition], + msg: Option[String] = None, + waitTime: Long = 10000L): Unit = { + val assignments = mutable.Buffer[Set[TopicPartition]]() + TestUtils.waitUntilTrue(() => { + assignments.clear() + consumerPollers.foreach(assignments += _.consumerAssignment()) + isPartitionAssignmentValid(assignments, subscriptions) + }, msg.getOrElse(s"Did not get valid assignment for partitions $subscriptions. Instead, got $assignments"), waitTime) + } + + /** + * Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates + * consumer poller and starts polling. + * Assumes that the consumer is not subscribed to any topics yet + * + * @param consumer consumer + * @param topicsToSubscribe topics that this consumer will subscribe to + * @return consumer poller for the given consumer + */ + def subscribeConsumerAndStartPolling(consumer: Consumer[Array[Byte], Array[Byte]], + topicsToSubscribe: List[String], + partitionsToAssign: Set[TopicPartition] = Set.empty[TopicPartition]): ConsumerAssignmentPoller = { + assertEquals(0, consumer.assignment().size) + val consumerPoller = if (topicsToSubscribe.nonEmpty) + new ConsumerAssignmentPoller(consumer, topicsToSubscribe) + else + new ConsumerAssignmentPoller(consumer, partitionsToAssign) + + consumerPoller.start() + consumerPoller + } + protected def awaitRebalance(consumer: Consumer[_, _], rebalanceListener: TestConsumerReassignmentListener): Unit = { val numReassignments = rebalanceListener.callsToAssigned TestUtils.pollUntilTrue(consumer, () => rebalanceListener.callsToAssigned > numReassignments, @@ -253,13 +360,25 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { } protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]], - topicsToSubscribe: List[String]) extends ShutdownableThread("daemon-consumer-assignment", false) + topicsToSubscribe: List[String], + partitionsToAssign: Set[TopicPartition]) extends ShutdownableThread("daemon-consumer-assignment", false) { - @volatile private var partitionAssignment: Set[TopicPartition] = Set.empty[TopicPartition] - private var topicsSubscription = topicsToSubscribe - @volatile private var subscriptionChanged = false + def this(consumer: Consumer[Array[Byte], Array[Byte]], topicsToSubscribe: List[String]) { + this(consumer, topicsToSubscribe, Set.empty[TopicPartition]) + } - val rebalanceListener = new ConsumerRebalanceListener { + def this(consumer: Consumer[Array[Byte], Array[Byte]], partitionsToAssign: Set[TopicPartition]) { + this(consumer, List.empty[String], partitionsToAssign) + } + + @volatile var thrownException: Option[Throwable] = None + @volatile var receivedMessages = 0 + + @volatile private var partitionAssignment: Set[TopicPartition] = partitionsToAssign + @volatile private var subscriptionChanged = false + private var topicsSubscription = topicsToSubscribe + + val rebalanceListener: ConsumerRebalanceListener = new ConsumerRebalanceListener { override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = { partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray: _*) } @@ -268,7 +387,11 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { partitionAssignment = Set.empty[TopicPartition] } } - consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener) + if (partitionAssignment.isEmpty) { + consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener) + } else { + consumer.assign(partitionAssignment.asJava) + } def consumerAssignment(): Set[TopicPartition] = { partitionAssignment @@ -285,14 +408,16 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { * @param newTopicsToSubscribe */ def subscribe(newTopicsToSubscribe: List[String]): Unit = { - if (subscriptionChanged) { + if (subscriptionChanged) throw new IllegalStateException("Do not call subscribe until the previous subscribe request is processed.") - } + if (partitionsToAssign.nonEmpty) + throw new IllegalStateException("Cannot call subscribe when configured to use manual partition assignment") + topicsSubscription = newTopicsToSubscribe subscriptionChanged = true } - def isSubscribeRequestProcessed(): Boolean = { + def isSubscribeRequestProcessed: Boolean = { !subscriptionChanged } @@ -308,9 +433,12 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { subscriptionChanged = false } try { - consumer.poll(Duration.ofMillis(50)) + receivedMessages += consumer.poll(Duration.ofMillis(50)).count() } catch { case _: WakeupException => // ignore for shutdown + case e: Throwable => + thrownException = Some(e) + throw e } } } diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index 4b278f08bb0..d9bc646410f 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._ abstract class BaseQuotaTest extends IntegrationTestHarness { - override val serverCount = 2 + override val brokerCount = 2 protected def producerClientId = "QuotasTestProducer-1" protected def consumerClientId = "QuotasTestConsumer-1" @@ -70,7 +70,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { super.setUp() val numPartitions = 1 - val leaders = createTopic(topic1, numPartitions, serverCount) + val leaders = createTopic(topic1, numPartitions, brokerCount) leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1) followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1) quotaTestClients = createQuotaTestClients(topic1, leaderNode) diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 1a1f37ee5c5..38650b4d3de 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -15,12 +15,10 @@ package kafka.api import java.time import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.ReentrantLock import java.util.{Collection, Collections, Properties} import util.control.Breaks._ -import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.server.KafkaConfig import kafka.utils.{CoreUtils, Logging, ShutdownableThread, TestUtils} import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} @@ -29,25 +27,23 @@ import org.apache.kafka.common.errors.GroupMaxSizeReachedException import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse} import org.junit.Assert._ -import org.junit.{After, Before, Ignore, Test} +import org.junit.{After, Ignore, Test} import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future => SFuture} +import scala.collection.mutable /** * Integration tests for the consumer that cover basic usage as well as server failures */ -class ConsumerBounceTest extends BaseRequestTest with Logging { - val topic = "topic" - val part = 0 - val tp = new TopicPartition(topic, part) +class ConsumerBounceTest extends BaseConsumerTest with Logging { val maxGroupSize = 5 // Time to process commit and leave group requests in tests when brokers are available - val gracefulCloseTimeMs = 1000 - val executor = Executors.newScheduledThreadPool(2) + val gracefulCloseTimeMs = Some(1000L) + val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(2) + val consumerPollers: mutable.Buffer[ConsumerAssignmentPoller] = mutable.Buffer[ConsumerAssignmentPoller]() + + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") override def generateConfigs = { generateKafkaConfigs() @@ -63,21 +59,14 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true") properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false") - FixedPortTestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false) + FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, enableControlledShutdown = false) .map(KafkaConfig.fromProps(_, properties)) } - @Before - override def setUp() { - super.setUp() - - // create the test topic with all the brokers as replicas - createTopic(topic, 1, numBrokers) - } - @After override def tearDown() { try { + consumerPollers.foreach(_.shutdown()) executor.shutdownNow() // Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) @@ -176,7 +165,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { val consumer = createConsumer() consumer.subscribe(Collections.singleton(newtopic)) executor.schedule(new Runnable { - def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor = numBrokers) + def run() = createTopic(newtopic, numPartitions = brokerCount, replicationFactor = brokerCount) }, 2, TimeUnit.SECONDS) consumer.poll(time.Duration.ZERO) @@ -201,18 +190,22 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { assertEquals(0, remainingRecords) } + val poller = new ConsumerAssignmentPoller(consumer, List(newtopic)) + consumerPollers += poller + poller.start() sendRecords(numRecords, newtopic) - receiveRecords(consumer, numRecords, 10000) + receiveExactRecords(poller, numRecords, 10000) + poller.shutdown() servers.foreach(server => killBroker(server.config.brokerId)) Thread.sleep(500) restartDeadBrokers() - val future = executor.submit(new Runnable { - def run() = receiveRecords(consumer, numRecords, 10000) - }) + val poller2 = new ConsumerAssignmentPoller(consumer, List(newtopic)) + consumerPollers += poller2 + poller2.start() sendRecords(numRecords, newtopic) - future.get + receiveExactRecords(poller, numRecords, 10000L) } @Test @@ -233,7 +226,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { */ private def checkCloseGoodPath(numRecords: Int, groupId: String) { val consumer = createConsumerAndReceive(groupId, false, numRecords) - val future = submitCloseAndValidate(consumer, Long.MaxValue, None, Some(gracefulCloseTimeMs)) + val future = submitCloseAndValidate(consumer, Long.MaxValue, None, gracefulCloseTimeMs) future.get checkClosedState(groupId, numRecords) } @@ -251,8 +244,8 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { killBroker(findCoordinator(dynamicGroup)) killBroker(findCoordinator(manualGroup)) - val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs)) - val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(gracefulCloseTimeMs)) + val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs) + val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, gracefulCloseTimeMs) future1.get future2.get @@ -302,6 +295,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { */ @Test def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = { + val group = "group-max-size-test" val topic = "group-max-size-test" val maxGroupSize = 2 val consumerCount = maxGroupSize + 1 @@ -311,77 +305,51 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { // ensure even record distribution per partition recordsProduced += partitionCount - recordsProduced % partitionCount } - val executor = Executors.newScheduledThreadPool(consumerCount * 2) this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - val producer = createProducer() - createTopic(topic, numPartitions = partitionCount, replicationFactor = numBrokers) - val stableConsumers = createConsumersWithGroupId("group2", consumerCount, executor, topic = topic) + val partitions = createTopicPartitions(topic, numPartitions = partitionCount, replicationFactor = brokerCount) - // assert group is stable and working - sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount)) - stableConsumers.foreach { cons => { - receiveAndCommit(cons, recordsProduced / consumerCount, 10000) - }} + addConsumersToGroupAndWaitForGroupAssignment(consumerCount, mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](), + consumerPollers, List[String](topic), partitions, group) // roll all brokers with a lesser max group size to make sure coordinator has the new config val newConfigs = generateKafkaConfigs(maxGroupSize.toString) - val kickedConsumerOut = new AtomicBoolean(false) var kickedOutConsumerIdx: Option[Int] = None - val lock = new ReentrantLock // restart brokers until the group moves to a Coordinator with the new config breakable { for (broker <- servers.indices) { killBroker(broker) - sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount)) - - var successfulConsumes = 0 - - // compute consumptions in a non-blocking way in order to account for the rebalance once the group.size takes effect - val consumeFutures = new ArrayBuffer[SFuture[Any]] - implicit val executorContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor) - stableConsumers.indices.foreach(idx => { - val currentConsumer = stableConsumers(idx) - val consumeFuture = SFuture { - try { - receiveAndCommit(currentConsumer, recordsProduced / consumerCount, 10000) - CoreUtils.inLock(lock) { successfulConsumes += 1 } - } catch { - case e: Throwable => - if (!e.isInstanceOf[GroupMaxSizeReachedException]) { - throw e - } - if (!kickedConsumerOut.compareAndSet(false, true)) { - fail(s"Received more than one ${classOf[GroupMaxSizeReachedException]}") - } - kickedOutConsumerIdx = Some(idx) - } + consumerPollers.indices.foreach(idx => { + consumerPollers(idx).thrownException match { + case Some(thrownException) => + if (!thrownException.isInstanceOf[GroupMaxSizeReachedException]) { + throw thrownException + } + if (kickedOutConsumerIdx.isDefined) { + fail(s"Received more than one ${classOf[GroupMaxSizeReachedException]}") + } + kickedOutConsumerIdx = Some(idx) + case None => } - - consumeFutures += consumeFuture }) - Await.result(SFuture.sequence(consumeFutures), Duration("12sec")) - if (kickedConsumerOut.get()) { - // validate the rest N-1 consumers consumed successfully - assertEquals(maxGroupSize, successfulConsumes) + if (kickedOutConsumerIdx.isDefined) break - } val config = newConfigs(broker) servers(broker) = TestUtils.createServer(config, time = brokerTime(config.brokerId)) restartDeadBrokers() }} - if (!kickedConsumerOut.get()) + if (kickedOutConsumerIdx.isEmpty) fail(s"Should have received an ${classOf[GroupMaxSizeReachedException]} during the cluster roll") + restartDeadBrokers() // assert that the group has gone through a rebalance and shed off one consumer - stableConsumers.remove(kickedOutConsumerIdx.get) - sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount)) - // should be only maxGroupSize consumers left in the group - stableConsumers.foreach { cons => { - receiveAndCommit(cons, recordsProduced / maxGroupSize, 10000) - }} + consumerPollers.remove(kickedOutConsumerIdx.get).shutdown() + sendRecords(createProducer(), recordsProduced, topic, numPartitions = Some(partitionCount)) + TestUtils.waitUntilTrue(() => { + consumerPollers.forall(p => p.receivedMessages >= recordsProduced / consumerCount) + }, "The remaining consumers in the group could not fetch the expected records", 10000L) } /** @@ -389,70 +357,30 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { */ @Test def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(): Unit = { - val topic = "group-max-size-test" - val groupId = "group1" - val executor = Executors.newScheduledThreadPool(maxGroupSize * 2) - createTopic(topic, maxGroupSize, numBrokers) + val group = "fatal-exception-test" + val topic = "fatal-exception-test" this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + val partitions = createTopicPartitions(topic, numPartitions = maxGroupSize, replicationFactor = brokerCount) + // Create N+1 consumers in the same consumer group and assert that the N+1th consumer receives a fatal error when it tries to join the group - val stableConsumers = createConsumersWithGroupId(groupId, maxGroupSize, executor, topic) - val newConsumer = createConsumerWithGroupId(groupId) - var failedRebalance = false - var exception: Exception = null - waitForRebalance(5000, subscribeAndPoll(newConsumer, executor = executor, onException = e => {failedRebalance = true; exception = e}), - executor = executor, stableConsumers:_*) - assertTrue("Rebalance did not fail as expected", failedRebalance) - assertTrue(exception.isInstanceOf[GroupMaxSizeReachedException]) + addConsumersToGroupAndWaitForGroupAssignment(maxGroupSize, mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](), + consumerPollers, List[String](topic), partitions, group) + val (_, rejectedConsumerPollers) = addConsumersToGroup(1, + mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](), mutable.Buffer[ConsumerAssignmentPoller](), List[String](topic), partitions, group) + val rejectedConsumer = rejectedConsumerPollers.head + TestUtils.waitUntilTrue(() => { + rejectedConsumer.thrownException.isDefined + }, "Extra consumer did not throw an exception") + assertTrue(rejectedConsumer.thrownException.get.isInstanceOf[GroupMaxSizeReachedException]) // assert group continues to live - val producer = createProducer() - sendRecords(producer, maxGroupSize * 100, topic, numPartitions = Some(maxGroupSize)) - stableConsumers.foreach { cons => { - receiveExactRecords(cons, 100, 10000) - }} - } - - /** - * Creates N consumers with the same group ID and ensures the group rebalances properly at each step - */ - private def createConsumersWithGroupId(groupId: String, consumerCount: Int, executor: ExecutorService, topic: String): ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]] = { - val stableConsumers = ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]]() - for (_ <- 1.to(consumerCount)) { - val newConsumer = createConsumerWithGroupId(groupId) - waitForRebalance(5000, subscribeAndPoll(newConsumer, executor = executor, topic = topic), - executor = executor, stableConsumers:_*) - stableConsumers += newConsumer - } - stableConsumers - } - - def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], executor: ExecutorService, revokeSemaphore: Option[Semaphore] = None, - onException: Exception => Unit = e => { throw e }, topic: String = topic, pollTimeout: Int = 1000): Future[Any] = { - executor.submit(CoreUtils.runnable { - try { - consumer.subscribe(Collections.singletonList(topic)) - consumer.poll(java.time.Duration.ofMillis(pollTimeout)) - } catch { - case e: Exception => onException.apply(e) - } - }, 0) - } - - def waitForRebalance(timeoutMs: Long, future: Future[Any], executor: ExecutorService, otherConsumers: KafkaConsumer[Array[Byte], Array[Byte]]*) { - val startMs = System.currentTimeMillis - implicit val executorContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor) - - while (System.currentTimeMillis < startMs + timeoutMs && !future.isDone) { - val consumeFutures = otherConsumers.map(consumer => SFuture { - consumer.poll(time.Duration.ofMillis(1000)) - }) - Await.result(SFuture.sequence(consumeFutures), Duration("1500ms")) - } - - assertTrue("Rebalance did not complete in time", future.isDone) + sendRecords(createProducer(), maxGroupSize * 100, topic, numPartitions = Some(partitions.size)) + TestUtils.waitUntilTrue(() => { + consumerPollers.forall(p => p.receivedMessages >= 100) + }, "The consumers in the group could not fetch the expected records", 10000L) } /** @@ -463,7 +391,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { @Test def testCloseDuringRebalance() { val topic = "closetest" - createTopic(topic, 10, numBrokers) + createTopic(topic, 10, brokerCount) this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") @@ -510,7 +438,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { val rebalanceFuture = createConsumerToRebalance() // consumer1 should leave group and close immediately even though rebalance is in progress - val closeFuture1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs)) + val closeFuture1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs) // Rebalance should complete without waiting for consumer1 to timeout since consumer1 has left the group waitForRebalance(2000, rebalanceFuture, consumer2) @@ -528,40 +456,22 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { closeFuture2.get(2000, TimeUnit.MILLISECONDS) } - private def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = { - consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) - createConsumer() - } - private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, numRecords: Int): KafkaConsumer[Array[Byte], Array[Byte]] = { val consumer = createConsumerWithGroupId(groupId) - if (manualAssign) - consumer.assign(Collections.singleton(tp)) - else - consumer.subscribe(Collections.singleton(topic)) - receiveExactRecords(consumer, numRecords) + val consumerPoller = if (manualAssign) + subscribeConsumerAndStartPolling(consumer, List(), Set(tp)) + else + subscribeConsumerAndStartPolling(consumer, List(topic)) + + receiveExactRecords(consumerPoller, numRecords) + consumerPoller.shutdown() consumer } - private def receiveRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long = 60000): Long = { - var received = 0L - val endTimeMs = System.currentTimeMillis + timeoutMs - while (received < numRecords && System.currentTimeMillis < endTimeMs) - received += consumer.poll(time.Duration.ofMillis(100)).count() - - received - } - - private def receiveExactRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long = 60000): Unit = { - val received = receiveRecords(consumer, numRecords, timeoutMs) - assertEquals(numRecords, received) - } - - @throws(classOf[CommitFailedException]) - private def receiveAndCommit(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long): Unit = { - val received = receiveRecords(consumer, numRecords, timeoutMs) - assertTrue(s"Received $received, expected at least $numRecords", numRecords <= received) - consumer.commitSync() + private def receiveExactRecords(consumer: ConsumerAssignmentPoller, numRecords: Int, timeoutMs: Long = 60000): Unit = { + TestUtils.waitUntilTrue(() => { + consumer.receivedMessages == numRecords + }, s"Consumer did not receive expected $numRecords. It received ${consumer.receivedMessages}", timeoutMs) } private def submitCloseAndValidate(consumer: KafkaConsumer[Array[Byte], Array[Byte]], @@ -617,6 +527,12 @@ class ConsumerBounceTest extends BaseRequestTest with Logging { } } + private def createTopicPartitions(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, + topicConfig: Properties = new Properties): Set[TopicPartition] = { + createTopic(topic, numPartitions = numPartitions, replicationFactor = replicationFactor, topicConfig = topicConfig) + Range(0, numPartitions).map(part => new TopicPartition(topic, part)).toSet + } + private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int, topic: String = this.topic, diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala index 8c1d34ddc2c..1394c83c1d8 100644 --- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala +++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala @@ -48,7 +48,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { override protected def interBrokerListenerName: ListenerName = new ListenerName("BROKER") override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) - override val serverCount: Int = 2 + override val brokerCount: Int = 2 private val kafkaServerSaslMechanisms = Seq("SCRAM-SHA-256") private val kafkaClientSaslMechanism = "SCRAM-SHA-256" @@ -161,7 +161,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota) user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true) - assertEquals(serverCount, callbackInstances.get) + assertEquals(brokerCount, callbackInstances.get) } /** diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala index 78fc215e38b..57fd552bee2 100644 --- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala +++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala @@ -30,7 +30,7 @@ import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup { - override val serverCount = 1 + override val brokerCount = 1 this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 0587a6de160..49977d016ac 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -58,7 +58,7 @@ import scala.collection.JavaConverters._ * would end up with ZooKeeperTestHarness twice. */ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { - override val serverCount = 3 + override val brokerCount = 3 override def configureSecurityBeforeServersStart() { AclCommand.main(clusterActionArgs) diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala index 27c3f31c3ff..6d2161d52e3 100644 --- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala @@ -33,9 +33,9 @@ class GroupAuthorizerIntegrationTest extends AuthorizerIntegrationTest { override val kafkaPrincipalType = GroupPrincipalType override def userPrincipal = TestGroupPrincipal - override def propertyOverrides(properties: Properties): Unit = { + override def brokerPropertyOverrides(properties: Properties): Unit = { properties.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName) - super.propertyOverrides(properties) + super.brokerPropertyOverrides(properties) } } \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 39f47ab2af0..5ffbc43acad 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -37,7 +37,7 @@ import scala.collection.mutable * A helper class for writing integration tests that involve producers, consumers, and servers */ abstract class IntegrationTestHarness extends KafkaServerTestHarness { - protected def serverCount: Int + protected def brokerCount: Int protected def logDirCount: Int = 1 val producerConfig = new Properties @@ -49,10 +49,20 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { protected def interBrokerListenerName: ListenerName = listenerName + protected def modifyConfigs(props: Seq[Properties]): Unit = { + configureListeners(props) + props.foreach(_ ++= serverConfig) + } + override def generateConfigs: Seq[KafkaConfig] = { - val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), + val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) - cfgs.foreach { config => + modifyConfigs(cfgs) + cfgs.map(KafkaConfig.fromProps) + } + + protected def configureListeners(props: Seq[Properties]): Unit = { + props.foreach { config => config.remove(KafkaConfig.InterBrokerSecurityProtocolProp) config.setProperty(KafkaConfig.InterBrokerListenerNameProp, interBrokerListenerName.value) @@ -63,8 +73,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { config.setProperty(KafkaConfig.ListenersProp, listeners) config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityMap) } - cfgs.foreach(_ ++= serverConfig) - cfgs.map(KafkaConfig.fromProps) } @Before diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala index 08a0224704d..b30b3639c05 100644 --- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala @@ -41,7 +41,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging { val producerCount = 1 val consumerCount = 2 - val serverCount = 3 + val brokerCount = 3 val groupId = "my-test" val clientId = "consumer-498" @@ -70,7 +70,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging { override def setUp() { super.setUp() client = AdminClient.createSimplePlaintext(this.brokerList) - createTopic(topic, 2, serverCount) + createTopic(topic, 2, brokerCount) } @After diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala index 795f954a32d..8d2e66eaa24 100644 --- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala +++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala @@ -33,7 +33,7 @@ import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue} class LogAppendTimeTest extends IntegrationTestHarness { val producerCount: Int = 1 val consumerCount: Int = 1 - val serverCount: Int = 2 + val brokerCount: Int = 2 // This will be used for the offsets topic as well serverConfig.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.name) diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index b2f5a7ea3b9..da466b8ab48 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._ class MetricsTest extends IntegrationTestHarness with SaslSetup { - override val serverCount = 1 + override val brokerCount = 1 override protected def listenerName = new ListenerName("CLIENT") private val kafkaClientSaslMechanism = "PLAIN" diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index c11fc12d16d..e3251a56bd4 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -19,7 +19,6 @@ import java.util.regex.Pattern import java.util.{Collections, Locale, Optional, Properties} import kafka.log.LogConfig -import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} @@ -38,6 +37,8 @@ import scala.collection.mutable.Buffer import kafka.server.QuotaType import kafka.server.KafkaServer +import scala.collection.mutable + /* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */ class PlaintextConsumerTest extends BaseConsumerTest { @@ -342,17 +343,17 @@ class PlaintextConsumerTest extends BaseConsumerTest { sendRecords(producer, numRecords, tp) val topic1 = "tblablac" // matches subscribed pattern - createTopic(topic1, 2, serverCount) + createTopic(topic1, 2, brokerCount) sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1)) val topic2 = "tblablak" // does not match subscribed pattern - createTopic(topic2, 2, serverCount) + createTopic(topic2, 2, brokerCount) sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1)) val topic3 = "tblab1" // does not match subscribed pattern - createTopic(topic3, 2, serverCount) + createTopic(topic3, 2, brokerCount) sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1)) @@ -370,7 +371,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { awaitAssignment(consumer, assignment) val topic4 = "tsomec" // matches subscribed pattern - createTopic(topic4, 2, serverCount) + createTopic(topic4, 2, brokerCount) sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1)) @@ -404,7 +405,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { // the first topic ('topic') matches first subscription pattern only val fooTopic = "foo" // matches both subscription patterns - createTopic(fooTopic, 1, serverCount) + createTopic(fooTopic, 1, brokerCount) sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0)) assertEquals(0, consumer.assignment().size) @@ -419,7 +420,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { awaitAssignment(consumer, assignment) val barTopic = "bar" // matches the next subscription pattern - createTopic(barTopic, 1, serverCount) + createTopic(barTopic, 1, brokerCount) sendRecords(producer, numRecords = 1000, new TopicPartition(barTopic, 0)) val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this @@ -450,7 +451,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { sendRecords(producer, numRecords, tp) val topic1 = "tblablac" // matches the subscription pattern - createTopic(topic1, 2, serverCount) + createTopic(topic1, 2, brokerCount) sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1)) @@ -517,7 +518,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer.subscribe(List(topic).asJava) awaitAssignment(consumer, initialAssignment) - createTopic(otherTopic, 2, serverCount) + createTopic(otherTopic, 2, brokerCount) val expandedAssignment = initialAssignment ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1)) consumer.subscribe(List(topic, otherTopic).asJava) awaitAssignment(consumer, expandedAssignment) @@ -526,7 +527,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { @Test def testShrinkingTopicSubscriptions() { val otherTopic = "other" - createTopic(otherTopic, 2, serverCount) + createTopic(otherTopic, 2, brokerCount) val initialAssignment = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1)) val consumer = createConsumer() consumer.subscribe(List(topic, otherTopic).asJava) @@ -781,7 +782,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { val partitionCount = 30 val topics = Seq(topic1, topic2, topic3) topics.foreach { topicName => - createTopic(topicName, partitionCount, serverCount) + createTopic(topicName, partitionCount, brokerCount) } val partitions = topics.flatMap { topic => @@ -861,11 +862,11 @@ class PlaintextConsumerTest extends BaseConsumerTest { // for the topic partition assignment val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions) try { - validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") + validateGroupAssignment(consumerPollers, subscriptions) // add one more consumer and validate re-assignment addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers, - List(topic1, topic2), subscriptions) + List(topic1, topic2), subscriptions, "roundrobin-group") } finally { consumerPollers.foreach(_.shutdown()) } @@ -900,11 +901,11 @@ class PlaintextConsumerTest extends BaseConsumerTest { // create a group of consumers, subscribe the consumers to the single topic and start polling // for the topic partition assignment val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(9, List(topic), partitions) - validateGroupAssignment(consumerPollers, partitions, s"Did not get valid initial assignment for partitions ${partitions.asJava}") + validateGroupAssignment(consumerPollers, partitions) val prePartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap) // add one more consumer and validate re-assignment - addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers, List(topic), partitions) + addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers, List(topic), partitions, "sticky-group") val postPartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap) val keys = prePartition2PollerId.keySet.union(postPartition2PollerId.keySet) @@ -945,7 +946,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { val consumerPollers = subscribeConsumers(consumersInGroup, List(topic, topic1)) try { - validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") + validateGroupAssignment(consumerPollers, subscriptions) // add 2 more consumers and validate re-assignment addConsumersToGroupAndWaitForGroupAssignment(2, consumersInGroup, consumerPollers, List(topic, topic1), subscriptions) @@ -1040,7 +1041,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { @Test def testAutoCommitIntercept() { val topic2 = "topic2" - createTopic(topic2, 2, serverCount) + createTopic(topic2, 2, brokerCount) // produce records val numRecords = 100 @@ -1336,7 +1337,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { @Test def testAutoCommitOnRebalance() { val topic2 = "topic2" - createTopic(topic2, 2, serverCount) + createTopic(topic2, 2, brokerCount) this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") val consumer = createConsumer() @@ -1376,7 +1377,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { def testPerPartitionLeadMetricsCleanUpWithSubscribe() { val numMessages = 1000 val topic2 = "topic2" - createTopic(topic2, 2, serverCount) + createTopic(topic2, 2, brokerCount) // send some messages. val producer = createProducer() sendRecords(producer, numMessages, tp) @@ -1415,7 +1416,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { def testPerPartitionLagMetricsCleanUpWithSubscribe() { val numMessages = 1000 val topic2 = "topic2" - createTopic(topic2, 2, serverCount) + createTopic(topic2, 2, brokerCount) // send some messages. val producer = createProducer() sendRecords(producer, numMessages, tp) @@ -1635,22 +1636,39 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumerPollers += timeoutPoller // validate the initial assignment - validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") + validateGroupAssignment(consumerPollers, subscriptions) // stop polling and close one of the consumers, should trigger partition re-assignment among alive consumers timeoutPoller.shutdown() if (closeConsumer) timeoutConsumer.close() - val maxSessionTimeout = this.serverConfig.getProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp).toLong validateGroupAssignment(consumerPollers, subscriptions, - s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left", 3 * maxSessionTimeout) + Some(s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left"), 3 * groupMaxSessionTimeoutMs) // done with pollers and consumers for (poller <- consumerPollers) poller.shutdown() } + /** + * Creates consumer pollers corresponding to a given consumer group, one per consumer; subscribes consumers to + * 'topicsToSubscribe' topics, waits until consumers get topics assignment. + * + * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller. + * + * @param consumerGroup consumer group + * @param topicsToSubscribe topics to which consumers will subscribe to + * @return collection of consumer pollers + */ + def subscribeConsumers(consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], + topicsToSubscribe: List[String]): mutable.Buffer[ConsumerAssignmentPoller] = { + val consumerPollers = mutable.Buffer[ConsumerAssignmentPoller]() + for (consumer <- consumerGroup) + consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe) + consumerPollers + } + /** * Creates topic 'topicName' with 'numPartitions' partitions and produces 'recordsPerPartition' * records to each partition @@ -1659,7 +1677,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { topicName: String, numPartitions: Int, recordsPerPartition: Int): Set[TopicPartition] = { - createTopic(topicName, numPartitions, serverCount) + createTopic(topicName, numPartitions, brokerCount) var parts = Set[TopicPartition]() for (partition <- 0 until numPartitions) { val tp = new TopicPartition(topicName, partition) @@ -1670,51 +1688,16 @@ class PlaintextConsumerTest extends BaseConsumerTest { } /** - * Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates - * consumer poller and starts polling. - * Assumes that the consumer is not subscribed to any topics yet - * - * @param consumer consumer - * @param topicsToSubscribe topics that this consumer will subscribe to - * @return consumer poller for the given consumer - */ - def subscribeConsumerAndStartPolling(consumer: Consumer[Array[Byte], Array[Byte]], - topicsToSubscribe: List[String]): ConsumerAssignmentPoller = { - assertEquals(0, consumer.assignment().size) - val consumerPoller = new ConsumerAssignmentPoller(consumer, topicsToSubscribe) - consumerPoller.start() - consumerPoller - } - - /** - * Creates consumer pollers corresponding to a given consumer group, one per consumer; subscribes consumers to - * 'topicsToSubscribe' topics, waits until consumers get topics assignment. - * - * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller. - * - * @param consumerGroup consumer group - * @param topicsToSubscribe topics to which consumers will subscribe to - * @return collection of consumer pollers - */ - def subscribeConsumers(consumerGroup: Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], - topicsToSubscribe: List[String]): Buffer[ConsumerAssignmentPoller] = { - val consumerPollers = Buffer[ConsumerAssignmentPoller]() - for (consumer <- consumerGroup) - consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe) - consumerPollers - } - - /** - * Creates 'consumerCount' consumers and consumer pollers, one per consumer; subscribes consumers to - * 'topicsToSubscribe' topics, waits until consumers get topics assignment. - * - * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller. - * - * @param consumerCount number of consumers to create - * @param topicsToSubscribe topics to which consumers will subscribe to - * @param subscriptions set of all topic partitions - * @return collection of created consumers and collection of corresponding consumer pollers - */ + * Creates 'consumerCount' consumers and consumer pollers, one per consumer; subscribes consumers to + * 'topicsToSubscribe' topics, waits until consumers get topics assignment. + * + * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller. + * + * @param consumerCount number of consumers to create + * @param topicsToSubscribe topics to which consumers will subscribe to + * @param subscriptions set of all topic partitions + * @return collection of created consumers and collection of corresponding consumer pollers + */ def createConsumerGroupAndWaitForAssignment(consumerCount: Int, topicsToSubscribe: List[String], subscriptions: Set[TopicPartition]): (Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], Buffer[ConsumerAssignmentPoller]) = { @@ -1728,54 +1711,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { (consumerGroup, consumerPollers) } - /** - * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding - * pollers for these consumers. Wait for partition re-assignment and validate. - * - * Currently, assignment validation requires that total number of partitions is greater or equal to - * number of consumers, so subscriptions.size must be greater or equal the resulting number of consumers in the group - * - * @param numOfConsumersToAdd number of consumers to create and add to the consumer group - * @param consumerGroup current consumer group - * @param consumerPollers current consumer pollers - * @param topicsToSubscribe topics to which new consumers will subscribe to - * @param subscriptions set of all topic partitions - */ - def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int, - consumerGroup: Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], - consumerPollers: Buffer[ConsumerAssignmentPoller], - topicsToSubscribe: List[String], - subscriptions: Set[TopicPartition]): Unit = { - assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size) - for (_ <- 0 until numOfConsumersToAdd) { - val consumer = createConsumer() - consumerGroup += consumer - consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe) - } - - // wait until topics get re-assigned and validate assignment - validateGroupAssignment(consumerPollers, subscriptions, - s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added $numOfConsumersToAdd consumer(s)") - } - - /** - * Wait for consumers to get partition assignment and validate it. - * - * @param consumerPollers consumer pollers corresponding to the consumer group we are testing - * @param subscriptions set of all topic partitions - * @param msg message to print when waiting for/validating assignment fails - */ - def validateGroupAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller], - subscriptions: Set[TopicPartition], - msg: String, - waitTime: Long = 10000L): Unit = { - TestUtils.waitUntilTrue(() => { - val assignments = Buffer[Set[TopicPartition]]() - consumerPollers.foreach(assignments += _.consumerAssignment()) - isPartitionAssignmentValid(assignments, subscriptions) - }, msg, waitTime) - } - def changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller], topicsToSubscribe: List[String], subscriptions: Set[TopicPartition]): Unit = { @@ -1785,11 +1720,11 @@ class PlaintextConsumerTest extends BaseConsumerTest { // since subscribe call to poller does not actually call consumer subscribe right away, wait // until subscribe is called on all consumers TestUtils.waitUntilTrue(() => { - consumerPollers forall (poller => poller.isSubscribeRequestProcessed()) - }, s"Failed to call subscribe on all consumers in the group for subscription ${subscriptions}", 1000L) + consumerPollers.forall { poller => poller.isSubscribeRequestProcessed } + }, s"Failed to call subscribe on all consumers in the group for subscription $subscriptions", 1000L) validateGroupAssignment(consumerPollers, subscriptions, - s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription") + Some(s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription")) } def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: Consumer[K, V], diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index c353b526ddd..bbb3d195460 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -39,7 +39,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) val consumerCount = 1 val producerCount = 1 - val serverCount = 1 + val brokerCount = 1 this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") this.serverConfig.setProperty(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") @@ -62,7 +62,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, JaasTestUtils.KafkaServerContextName)) super.setUp() - createTopic(topic, numPartitions, serverCount) + createTopic(topic, numPartitions, brokerCount) } @After diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala index c15a5080bf2..59c6d3465c6 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala @@ -13,7 +13,7 @@ package kafka.api import java.io.File -import java.util.Locale +import java.util.{Locale, Properties} import kafka.server.KafkaConfig import kafka.utils.{JaasTestUtils, TestUtils} @@ -47,6 +47,11 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup { closeSasl() } + override def modifyConfigs(props: Seq[Properties]): Unit = { + super.modifyConfigs(props) + configureListeners(props) + } + /** * Checks that everyone can access ZkUtils.SecureZkRootPaths and ZkUtils.SensitiveZkRootPaths * when zookeeper.set.acl=false, even if ZooKeeper is SASL-enabled. diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala index 41e1ff96b2d..86865758990 100644 --- a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala @@ -39,7 +39,7 @@ import scala.collection.JavaConverters._ class DynamicConnectionQuotaTest extends BaseRequestTest { - override def numBrokers = 1 + override def brokerCount = 1 val topic = "test" val listener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) @@ -49,7 +49,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { @Before override def setUp(): Unit = { super.setUp() - TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers) + TestUtils.createTopic(zkClient, topic, brokerCount, brokerCount, servers) } @After @@ -64,8 +64,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { } } - override protected def propertyOverrides(properties: Properties): Unit = { - super.propertyOverrides(properties) + override protected def brokerPropertyOverrides(properties: Properties): Unit = { + super.brokerPropertyOverrides(properties) } @Test diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala index 07885cdbb00..8842171185a 100644 --- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala +++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala @@ -39,7 +39,7 @@ import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { - override val serverCount = 1 + override val brokerCount = 1 override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT private val kafkaClientSaslMechanism = "GSSAPI" @@ -70,7 +70,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { clientConfig.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, "5000") // create the test topic with all the brokers as replicas - createTopic(topic, 2, serverCount) + createTopic(topic, 2, brokerCount) } @After diff --git a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala index 0f4065080ce..4c2c64030b4 100644 --- a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala +++ b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala @@ -36,7 +36,7 @@ import scala.collection.JavaConverters._ */ class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup { - override val serverCount = 1 + override val brokerCount = 1 private val kafkaClientSaslMechanism = "SCRAM-SHA-256" private val kafkaServerSaslMechanisms = Collections.singletonList("SCRAM-SHA-256").asScala diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 31283468aac..a5f1bab09c2 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._ class AddPartitionsTest extends BaseRequestTest { - protected override def numBrokers: Int = 4 + override def brokerCount: Int = 4 val partitionId = 0 diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala index a4c27e50288..0c9f4d3d052 100644 --- a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala @@ -39,7 +39,7 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup { protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) var adminClient: org.apache.kafka.clients.admin.AdminClient = null - override def numBrokers = 1 + override def brokerCount = 1 @Before override def setUp(): Unit = { @@ -48,11 +48,11 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup { } override def generateConfigs = { - val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, + val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect, enableControlledShutdown = false, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true) - props.foreach(propertyOverrides) + props.foreach(brokerPropertyOverrides) props.map(KafkaConfig.fromProps) } diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala index d6262b3da34..a54e5fc2f6c 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._ class AbstractCreateTopicsRequestTest extends BaseRequestTest { - override def propertyOverrides(properties: Properties): Unit = + override def brokerPropertyOverrides(properties: Properties): Unit = properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) def topicsReq(topics: Seq[CreatableTopic], diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala index 9071f95e438..d15409ced66 100644 --- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala @@ -31,7 +31,7 @@ class AddPartitionsToTxnRequestTest extends BaseRequestTest { private val topic1 = "foobartopic" val numPartitions = 3 - override def propertyOverrides(properties: Properties): Unit = + override def brokerPropertyOverrides(properties: Properties): Unit = properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) @Before diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala index 28ed81d060b..d61c14ca60d 100644 --- a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala @@ -33,7 +33,7 @@ import scala.util.Random class AlterReplicaLogDirsRequestTest extends BaseRequestTest { override val logDirCount = 5 - override val numBrokers = 1 + override val brokerCount = 1 val topic = "topic" diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index 83f7111bb2b..ab70aec3791 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -40,7 +40,7 @@ object ApiVersionsRequestTest { class ApiVersionsRequestTest extends BaseRequestTest { - override def numBrokers: Int = 1 + override def brokerCount: Int = 1 @Test def testApiVersionsRequest() { diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index 09ffe4fe83c..a92aeaf66e7 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -24,7 +24,6 @@ import java.util.Properties import kafka.api.IntegrationTestHarness import kafka.network.SocketServer -import kafka.utils._ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.types.Struct import org.apache.kafka.common.protocol.ApiKeys @@ -32,22 +31,19 @@ import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestRespons import org.apache.kafka.common.security.auth.SecurityProtocol abstract class BaseRequestTest extends IntegrationTestHarness { - override val serverCount: Int = numBrokers private var correlationId = 0 // If required, set number of brokers - protected def numBrokers: Int = 3 + override def brokerCount: Int = 3 // If required, override properties by mutating the passed Properties object - protected def propertyOverrides(properties: Properties) {} + protected def brokerPropertyOverrides(properties: Properties) {} - override def generateConfigs = { - val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, - enableControlledShutdown = false, - interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) - props.foreach(propertyOverrides) - props.map(KafkaConfig.fromProps) + override def modifyConfigs(props: Seq[Properties]): Unit = { + props.foreach { p => + p.put(KafkaConfig.ControlledShutdownEnableProp, "false") + brokerPropertyOverrides(p) + } } def anySocketServer = { diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala index db2028c32b2..709b3c977c0 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala @@ -55,7 +55,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions", numPartitions = -1))), Map("error-partitions" -> error(Errors.INVALID_PARTITIONS)), checkErrorMessage = false) validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication", - replicationFactor = numBrokers + 1))), + replicationFactor = brokerCount + 1))), Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR)), checkErrorMessage = false) validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-config", config=Map("not.a.property" -> "error")))), @@ -71,7 +71,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { validateErrorCreateTopicsRequests(topicsReq(Seq( topicReq(existingTopic), topicReq("partial-partitions", numPartitions = -1), - topicReq("partial-replication", replicationFactor=numBrokers + 1), + topicReq("partial-replication", replicationFactor=brokerCount + 1), topicReq("partial-assignment", assignment=Map(0 -> List(0, 1), 1 -> List(0))), topicReq("partial-none"))), Map( diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala index 4fc3244053d..0395484cf3d 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala @@ -32,8 +32,8 @@ import scala.collection.JavaConverters._ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest { import CreateTopicsRequestWithPolicyTest._ - override def propertyOverrides(properties: Properties): Unit = { - super.propertyOverrides(properties) + override def brokerPropertyOverrides(properties: Properties): Unit = { + super.brokerPropertyOverrides(properties) properties.put(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[Policy].getName) } @@ -94,7 +94,7 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest Some("Topic 'existing-topic' already exists.")))) validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication", - numPartitions = 10, replicationFactor = numBrokers + 1)), validateOnly = true), + numPartitions = 10, replicationFactor = brokerCount + 1)), validateOnly = true), Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR, Some("Replication factor: 4 larger than available brokers: 3.")))) diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala index 6d56a023930..0a7e194dc40 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala @@ -29,7 +29,7 @@ import scala.concurrent.ExecutionException class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest { var adminClient: AdminClient = null - override def numBrokers = 1 + override def brokerCount = 1 @Before override def setUp(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala index 6f79a9a20a6..ae650163261 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala @@ -38,7 +38,7 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup { protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) var adminClient: AdminClient = null - override def numBrokers = 1 + override def brokerCount = 1 @Before override def setUp(): Unit = { @@ -47,11 +47,11 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup { } override def generateConfigs = { - val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, + val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect, enableControlledShutdown = false, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true) - props.foreach(propertyOverrides) + props.foreach(brokerPropertyOverrides) props.map(KafkaConfig.fromProps) } diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala index 1203f839f89..7de624f2d0e 100644 --- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala @@ -36,7 +36,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) var adminClient: AdminClient = null - override def numBrokers = 1 + override def brokerCount = 1 @Before override def setUp(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala index 7240d77076a..6a011653f2e 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala @@ -29,14 +29,14 @@ import java.util.Collections class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest { - override def numBrokers: Int = 1 + override def brokerCount: Int = 1 override def generateConfigs = { - val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, + val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect, enableControlledShutdown = false, enableDeleteTopic = false, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) - props.foreach(propertyOverrides) + props.foreach(brokerPropertyOverrides) props.map(KafkaConfig.fromProps) } diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala index 5a0244b75f3..a1e4c733efb 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala @@ -27,7 +27,7 @@ import java.io.File class DescribeLogDirsRequestTest extends BaseRequestTest { override val logDirCount = 2 - override val numBrokers: Int = 1 + override val brokerCount: Int = 1 val topic = "topic" val partitionNum = 2 diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala index 673abe693c9..4ae7d936975 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala @@ -32,7 +32,7 @@ import org.junit.Test class FetchRequestDownConversionConfigTest extends BaseRequestTest { private var producer: KafkaProducer[String, String] = null - override def numBrokers: Int = 1 + override def brokerCount: Int = 1 override def setUp(): Unit = { super.setUp() @@ -45,8 +45,8 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest { super.tearDown() } - override protected def propertyOverrides(properties: Properties): Unit = { - super.propertyOverrides(properties) + override protected def brokerPropertyOverrides(properties: Properties): Unit = { + super.brokerPropertyOverrides(properties) properties.put(KafkaConfig.LogMessageDownConversionEnableProp, "false") } diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala index 30f3b234613..f3580cf5ec4 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala @@ -36,9 +36,9 @@ import java.util.concurrent.atomic.AtomicInteger */ class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest { - override def numBrokers: Int = 1 + override def brokerCount: Int = 1 - override def propertyOverrides(properties: Properties): Unit = { + override def brokerPropertyOverrides(properties: Properties): Unit = { properties.put(KafkaConfig.MetricReporterClassesProp, classOf[KafkaMetricReporterExceptionHandlingTest.BadReporter].getName + "," + classOf[KafkaMetricReporterExceptionHandlingTest.GoodReporter].getName) } diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala index d56a9f0559c..3eff38fdb93 100644 --- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala @@ -41,7 +41,7 @@ class LogDirFailureTest extends IntegrationTestHarness { val producerCount: Int = 1 val consumerCount: Int = 1 - val serverCount: Int = 2 + val brokerCount: Int = 2 private val topic = "topic" private val partitionNum = 12 override val logDirCount = 3 @@ -52,7 +52,7 @@ class LogDirFailureTest extends IntegrationTestHarness { @Before override def setUp() { super.setUp() - createTopic(topic, partitionNum, serverCount) + createTopic(topic, partitionNum, brokerCount) } @Test @@ -71,7 +71,7 @@ class LogDirFailureTest extends IntegrationTestHarness { var server: KafkaServer = null try { - val props = TestUtils.createBrokerConfig(serverCount, zkConnect, logDirCount = 3) + val props = TestUtils.createBrokerConfig(brokerCount, zkConnect, logDirCount = 3) props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.11.0") props.put(KafkaConfig.LogMessageFormatVersionProp, "0.11.0") val kafkaConfig = KafkaConfig.fromProps(props) @@ -118,7 +118,7 @@ class LogDirFailureTest extends IntegrationTestHarness { // has fetched from the leader and attempts to append to the offline replica. producer.send(record).get - assertEquals(serverCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size) + assertEquals(brokerCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size) followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread => assertFalse("ReplicaFetcherThread should still be working if its partition count > 0", thread.isShutdownComplete) } diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 04b34675d86..7b52e7b3453 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -38,11 +38,11 @@ class LogOffsetTest extends BaseRequestTest { private lazy val time = new MockTime - protected override def numBrokers = 1 + override def brokerCount = 1 protected override def brokerTime(brokerId: Int) = time - protected override def propertyOverrides(props: Properties): Unit = { + protected override def brokerPropertyOverrides(props: Properties): Unit = { props.put("log.flush.interval.messages", "1") props.put("num.partitions", "20") props.put("log.retention.hours", "10") diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index bde16b6de3f..f920d94310c 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -34,7 +34,7 @@ import scala.collection.JavaConverters._ class MetadataRequestTest extends BaseRequestTest { - override def propertyOverrides(properties: Properties) { + override def brokerPropertyOverrides(properties: Properties) { properties.setProperty(KafkaConfig.DefaultReplicationFactorProp, "2") properties.setProperty(KafkaConfig.RackProp, s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}") } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 671f9e0427e..d04f39f9c83 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -49,7 +49,7 @@ import scala.collection.mutable.ListBuffer class RequestQuotaTest extends BaseRequestTest { - override def numBrokers: Int = 1 + override def brokerCount: Int = 1 private val topic = "topic-1" private val numPartitions = 1 @@ -66,7 +66,7 @@ class RequestQuotaTest extends BaseRequestTest { private val executor = Executors.newCachedThreadPool private val tasks = new ListBuffer[Task] - override def propertyOverrides(properties: Properties): Unit = { + override def brokerPropertyOverrides(properties: Properties): Unit = { properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 185a2f49e63..7dcc96baaf7 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -36,7 +36,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslSetup { private val kafkaServerSaslMechanisms = List("PLAIN") protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) - override def numBrokers = 1 + override def brokerCount = 1 @Before override def setUp(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala index 2d0f1db3727..f246b253a25 100644 --- a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala @@ -28,7 +28,7 @@ import collection.JavaConverters._ class StopReplicaRequestTest extends BaseRequestTest { override val logDirCount = 2 - override val numBrokers: Int = 1 + override val brokerCount: Int = 1 val topic = "topic" val partitionNum = 2