KAFKA-7893; Refactor ConsumerBounceTest to reuse functionality from BaseConsumerTest (#6238)

This PR should help address the flakiness in the ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup test (https://issues.apache.org/jira/browse/KAFKA-7965). I tested this locally and have verified it significantly reduces flakiness - 25/25 tests now pass. Running the test 25 times in trunk, I'd get `18/25` passes.

It does so by reusing the less-flaky consumer integration testing functionality inside `BaseConsumerTest`. Most notably, the test now makes use of the `ConsumerAssignmentPoller` class  - each consumer now polls non-stop rather than the more batch-oriented polling we had in `ConsumerBounceTest#waitForRebalance()`.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Stanislav Kozlovski 2019-04-05 13:08:04 -07:00 committed by Jason Gustafson
parent 4aa2cfe467
commit cc4fde35c9
41 changed files with 393 additions and 405 deletions

View File

@ -87,12 +87,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
super.tearDown() super.tearDown()
} }
val serverCount = 3 val brokerCount = 3
val consumerCount = 1 val consumerCount = 1
val producerCount = 1 val producerCount = 1
override def generateConfigs = { override def generateConfigs = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = 2) trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = 2)
cfgs.foreach { config => cfgs.foreach { config =>
config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}") config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
@ -197,7 +197,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(3, partition.replicas.size) assertEquals(3, partition.replicas.size)
partition.replicas.asScala.foreach { replica => partition.replicas.asScala.foreach { replica =>
assertTrue(replica.id >= 0) assertTrue(replica.id >= 0)
assertTrue(replica.id < serverCount) assertTrue(replica.id < brokerCount)
} }
assertEquals("No duplicate replica ids", partition.replicas.size, partition.replicas.asScala.map(_.id).distinct.size) assertEquals("No duplicate replica ids", partition.replicas.size, partition.replicas.asScala.map(_.id).distinct.size)
@ -301,10 +301,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val topic = "topic" val topic = "topic"
val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1) val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1)
val partitionsByBroker = leaderByPartition.groupBy { case (partitionId, leaderId) => leaderId }.mapValues(_.keys.toSeq) val partitionsByBroker = leaderByPartition.groupBy { case (partitionId, leaderId) => leaderId }.mapValues(_.keys.toSeq)
val brokers = (0 until serverCount).map(Integer.valueOf) val brokers = (0 until brokerCount).map(Integer.valueOf)
val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get
(0 until serverCount).foreach { brokerId => (0 until brokerCount).foreach { brokerId =>
val server = servers.find(_.config.brokerId == brokerId).get val server = servers.find(_.config.brokerId == brokerId).get
val expectedPartitions = partitionsByBroker(brokerId) val expectedPartitions = partitionsByBroker(brokerId)
val logDirInfos = logDirInfosByBroker.get(brokerId) val logDirInfos = logDirInfosByBroker.get(brokerId)
@ -361,7 +361,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertTrue(exception.getCause.isInstanceOf[UnknownTopicOrPartitionException]) assertTrue(exception.getCause.isInstanceOf[UnknownTopicOrPartitionException])
} }
createTopic(topic, numPartitions = 1, replicationFactor = serverCount) createTopic(topic, numPartitions = 1, replicationFactor = brokerCount)
servers.foreach { server => servers.foreach { server =>
val logDir = server.logManager.getLog(tp).get.dir.getParent val logDir = server.logManager.getLog(tp).get.dir.getParent
assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir) assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir)
@ -759,7 +759,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test @Test
def testSeekAfterDeleteRecords(): Unit = { def testSeekAfterDeleteRecords(): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = serverCount) createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = AdminClient.create(createConfig) client = AdminClient.create(createConfig)
@ -788,7 +788,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test @Test
def testLogStartOffsetCheckpoint(): Unit = { def testLogStartOffsetCheckpoint(): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = serverCount) createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = AdminClient.create(createConfig) client = AdminClient.create(createConfig)
@ -801,7 +801,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
var lowWatermark: Option[Long] = Some(result.lowWatermarks.get(topicPartition).get.lowWatermark) var lowWatermark: Option[Long] = Some(result.lowWatermarks.get(topicPartition).get.lowWatermark)
assertEquals(Some(5), lowWatermark) assertEquals(Some(5), lowWatermark)
for (i <- 0 until serverCount) { for (i <- 0 until brokerCount) {
killBroker(i) killBroker(i)
} }
restartDeadBrokers() restartDeadBrokers()
@ -828,7 +828,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test @Test
def testLogStartOffsetAfterDeleteRecords(): Unit = { def testLogStartOffsetAfterDeleteRecords(): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = serverCount) createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = AdminClient.create(createConfig) client = AdminClient.create(createConfig)
@ -842,13 +842,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val lowWatermark = result.lowWatermarks.get(topicPartition).get.lowWatermark val lowWatermark = result.lowWatermarks.get(topicPartition).get.lowWatermark
assertEquals(3L, lowWatermark) assertEquals(3L, lowWatermark)
for (i <- 0 until serverCount) for (i <- 0 until brokerCount)
assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset) assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
} }
@Test @Test
def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = { def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = {
val leaders = createTopic(topic, numPartitions = 1, replicationFactor = serverCount) val leaders = createTopic(topic, numPartitions = 1, replicationFactor = brokerCount)
val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1 val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1
def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = { def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = {
@ -881,7 +881,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L) waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L)
// after the new replica caught up, all replicas should have same log start offset // after the new replica caught up, all replicas should have same log start offset
for (i <- 0 until serverCount) for (i <- 0 until brokerCount)
assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset) assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
// kill the same follower again, produce more records, and delete records beyond follower's LOE // kill the same follower again, produce more records, and delete records beyond follower's LOE
@ -896,7 +896,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test @Test
def testAlterLogDirsAfterDeleteRecords(): Unit = { def testAlterLogDirsAfterDeleteRecords(): Unit = {
client = AdminClient.create(createConfig) client = AdminClient.create(createConfig)
createTopic(topic, numPartitions = 1, replicationFactor = serverCount) createTopic(topic, numPartitions = 1, replicationFactor = brokerCount)
val expectedLEO = 100 val expectedLEO = 100
val producer = createProducer() val producer = createProducer()
sendRecords(producer, expectedLEO, topicPartition) sendRecords(producer, expectedLEO, topicPartition)
@ -905,7 +905,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava) val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
result.all().get() result.all().get()
// make sure we are in the expected state after delete records // make sure we are in the expected state after delete records
for (i <- 0 until serverCount) { for (i <- 0 until brokerCount) {
assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset) assertEquals(3, servers(i).replicaManager.localReplica(topicPartition).get.logStartOffset)
assertEquals(expectedLEO, servers(i).replicaManager.localReplica(topicPartition).get.logEndOffset) assertEquals(expectedLEO, servers(i).replicaManager.localReplica(topicPartition).get.logEndOffset)
} }
@ -927,7 +927,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test @Test
def testOffsetsForTimesAfterDeleteRecords(): Unit = { def testOffsetsForTimesAfterDeleteRecords(): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = serverCount) createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = AdminClient.create(createConfig) client = AdminClient.create(createConfig)
@ -999,7 +999,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test @Test
def testDescribeConfigsForTopic(): Unit = { def testDescribeConfigsForTopic(): Unit = {
createTopic(topic, numPartitions = 2, replicationFactor = serverCount) createTopic(topic, numPartitions = 2, replicationFactor = brokerCount)
client = AdminClient.create(createConfig) client = AdminClient.create(createConfig)
val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic) val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic)

View File

@ -54,7 +54,7 @@ import scala.collection.mutable.Buffer
class AuthorizerIntegrationTest extends BaseRequestTest { class AuthorizerIntegrationTest extends BaseRequestTest {
override def numBrokers: Int = 1 override def brokerCount: Int = 1
val brokerId: Integer = 0 val brokerId: Integer = 0
def userPrincipal = KafkaPrincipal.ANONYMOUS def userPrincipal = KafkaPrincipal.ANONYMOUS
@ -103,7 +103,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000") producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000")
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group) consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
override def propertyOverrides(properties: Properties): Unit = { override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName) properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")

View File

@ -14,13 +14,14 @@ package kafka.api
import java.time.Duration import java.time.Duration
import java.util import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import kafka.utils.{ShutdownableThread, TestUtils} import kafka.utils.{ShutdownableThread, TestUtils}
import kafka.server.KafkaConfig import kafka.server.{BaseRequestTest, KafkaConfig}
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{Before, Test} import org.junit.{Before, Test}
@ -30,43 +31,49 @@ import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
import scala.collection.mutable
/** /**
* Integration tests for the consumer that cover basic usage as well as server failures * Integration tests for the consumer that cover basic usage as well as server failures
*/ */
abstract class BaseConsumerTest extends IntegrationTestHarness { abstract class BaseConsumerTest extends BaseRequestTest {
val epsilon = 0.1 val epsilon = 0.1
val serverCount = 3 override def brokerCount: Int = 3
val topic = "topic" val topic = "topic"
val part = 0 val part = 0
val tp = new TopicPartition(topic, part) val tp = new TopicPartition(topic, part)
val part2 = 1 val part2 = 1
val tp2 = new TopicPartition(topic, part2) val tp2 = new TopicPartition(topic, part2)
val group = "my-test"
val producerClientId = "ConsumerTestProducer" val producerClientId = "ConsumerTestProducer"
val consumerClientId = "ConsumerTestConsumer" val consumerClientId = "ConsumerTestConsumer"
val groupMaxSessionTimeoutMs = 30000L
// configure the servers and clients
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "10")
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId) this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId) this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100") this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
override protected def brokerPropertyOverrides(properties: Properties): Unit = {
properties.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
properties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
properties.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
properties.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, groupMaxSessionTimeoutMs.toString)
properties.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "10")
}
@Before @Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
// create the test topic with all the brokers as replicas // create the test topic with all the brokers as replicas
createTopic(topic, 2, serverCount) createTopic(topic, 2, brokerCount)
} }
@Test @Test
@ -90,7 +97,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
@Test @Test
def testCoordinatorFailover() { def testCoordinatorFailover() {
val listener = new TestConsumerReassignmentListener() val listener = new TestConsumerReassignmentListener()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000") this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000")
val consumer = createConsumer() val consumer = createConsumer()
@ -130,6 +137,12 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
} }
} }
protected def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = {
val groupOverrideConfig = new Properties
groupOverrideConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
createConsumer(configOverrides = groupOverrideConfig)
}
protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int, protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int,
tp: TopicPartition): Seq[ProducerRecord[Array[Byte], Array[Byte]]] = { tp: TopicPartition): Seq[ProducerRecord[Array[Byte], Array[Byte]]] = {
val records = (0 until numRecords).map { i => val records = (0 until numRecords).map { i =>
@ -223,6 +236,100 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
assertEquals(None, commitCallback.error) assertEquals(None, commitCallback.error)
} }
/**
* Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding
* pollers for these consumers. Wait for partition re-assignment and validate.
*
* Currently, assignment validation requires that total number of partitions is greater or equal to
* number of consumers, so subscriptions.size must be greater or equal the resulting number of consumers in the group
*
* @param numOfConsumersToAdd number of consumers to create and add to the consumer group
* @param consumerGroup current consumer group
* @param consumerPollers current consumer pollers
* @param topicsToSubscribe topics to which new consumers will subscribe to
* @param subscriptions set of all topic partitions
*/
def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int,
consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
consumerPollers: mutable.Buffer[ConsumerAssignmentPoller],
topicsToSubscribe: List[String],
subscriptions: Set[TopicPartition],
group: String = group): (mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], mutable.Buffer[ConsumerAssignmentPoller]) = {
assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
addConsumersToGroup(numOfConsumersToAdd, consumerGroup, consumerPollers, topicsToSubscribe, subscriptions, group)
// wait until topics get re-assigned and validate assignment
validateGroupAssignment(consumerPollers, subscriptions)
(consumerGroup, consumerPollers)
}
/**
* Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding
* pollers for these consumers.
*
*
* @param numOfConsumersToAdd number of consumers to create and add to the consumer group
* @param consumerGroup current consumer group
* @param consumerPollers current consumer pollers
* @param topicsToSubscribe topics to which new consumers will subscribe to
* @param subscriptions set of all topic partitions
*/
def addConsumersToGroup(numOfConsumersToAdd: Int,
consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
consumerPollers: mutable.Buffer[ConsumerAssignmentPoller],
topicsToSubscribe: List[String],
subscriptions: Set[TopicPartition],
group: String = group): (mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], mutable.Buffer[ConsumerAssignmentPoller]) = {
for (_ <- 0 until numOfConsumersToAdd) {
val consumer = createConsumerWithGroupId(group)
consumerGroup += consumer
consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
}
(consumerGroup, consumerPollers)
}
/**
* Wait for consumers to get partition assignment and validate it.
*
* @param consumerPollers consumer pollers corresponding to the consumer group we are testing
* @param subscriptions set of all topic partitions
* @param msg message to print when waiting for/validating assignment fails
*/
def validateGroupAssignment(consumerPollers: mutable.Buffer[ConsumerAssignmentPoller],
subscriptions: Set[TopicPartition],
msg: Option[String] = None,
waitTime: Long = 10000L): Unit = {
val assignments = mutable.Buffer[Set[TopicPartition]]()
TestUtils.waitUntilTrue(() => {
assignments.clear()
consumerPollers.foreach(assignments += _.consumerAssignment())
isPartitionAssignmentValid(assignments, subscriptions)
}, msg.getOrElse(s"Did not get valid assignment for partitions $subscriptions. Instead, got $assignments"), waitTime)
}
/**
* Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates
* consumer poller and starts polling.
* Assumes that the consumer is not subscribed to any topics yet
*
* @param consumer consumer
* @param topicsToSubscribe topics that this consumer will subscribe to
* @return consumer poller for the given consumer
*/
def subscribeConsumerAndStartPolling(consumer: Consumer[Array[Byte], Array[Byte]],
topicsToSubscribe: List[String],
partitionsToAssign: Set[TopicPartition] = Set.empty[TopicPartition]): ConsumerAssignmentPoller = {
assertEquals(0, consumer.assignment().size)
val consumerPoller = if (topicsToSubscribe.nonEmpty)
new ConsumerAssignmentPoller(consumer, topicsToSubscribe)
else
new ConsumerAssignmentPoller(consumer, partitionsToAssign)
consumerPoller.start()
consumerPoller
}
protected def awaitRebalance(consumer: Consumer[_, _], rebalanceListener: TestConsumerReassignmentListener): Unit = { protected def awaitRebalance(consumer: Consumer[_, _], rebalanceListener: TestConsumerReassignmentListener): Unit = {
val numReassignments = rebalanceListener.callsToAssigned val numReassignments = rebalanceListener.callsToAssigned
TestUtils.pollUntilTrue(consumer, () => rebalanceListener.callsToAssigned > numReassignments, TestUtils.pollUntilTrue(consumer, () => rebalanceListener.callsToAssigned > numReassignments,
@ -253,13 +360,25 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
} }
protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]], protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]],
topicsToSubscribe: List[String]) extends ShutdownableThread("daemon-consumer-assignment", false) topicsToSubscribe: List[String],
partitionsToAssign: Set[TopicPartition]) extends ShutdownableThread("daemon-consumer-assignment", false)
{ {
@volatile private var partitionAssignment: Set[TopicPartition] = Set.empty[TopicPartition] def this(consumer: Consumer[Array[Byte], Array[Byte]], topicsToSubscribe: List[String]) {
private var topicsSubscription = topicsToSubscribe this(consumer, topicsToSubscribe, Set.empty[TopicPartition])
@volatile private var subscriptionChanged = false }
val rebalanceListener = new ConsumerRebalanceListener { def this(consumer: Consumer[Array[Byte], Array[Byte]], partitionsToAssign: Set[TopicPartition]) {
this(consumer, List.empty[String], partitionsToAssign)
}
@volatile var thrownException: Option[Throwable] = None
@volatile var receivedMessages = 0
@volatile private var partitionAssignment: Set[TopicPartition] = partitionsToAssign
@volatile private var subscriptionChanged = false
private var topicsSubscription = topicsToSubscribe
val rebalanceListener: ConsumerRebalanceListener = new ConsumerRebalanceListener {
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = { override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray: _*) partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray: _*)
} }
@ -268,7 +387,11 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
partitionAssignment = Set.empty[TopicPartition] partitionAssignment = Set.empty[TopicPartition]
} }
} }
if (partitionAssignment.isEmpty) {
consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener) consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
} else {
consumer.assign(partitionAssignment.asJava)
}
def consumerAssignment(): Set[TopicPartition] = { def consumerAssignment(): Set[TopicPartition] = {
partitionAssignment partitionAssignment
@ -285,14 +408,16 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
* @param newTopicsToSubscribe * @param newTopicsToSubscribe
*/ */
def subscribe(newTopicsToSubscribe: List[String]): Unit = { def subscribe(newTopicsToSubscribe: List[String]): Unit = {
if (subscriptionChanged) { if (subscriptionChanged)
throw new IllegalStateException("Do not call subscribe until the previous subscribe request is processed.") throw new IllegalStateException("Do not call subscribe until the previous subscribe request is processed.")
} if (partitionsToAssign.nonEmpty)
throw new IllegalStateException("Cannot call subscribe when configured to use manual partition assignment")
topicsSubscription = newTopicsToSubscribe topicsSubscription = newTopicsToSubscribe
subscriptionChanged = true subscriptionChanged = true
} }
def isSubscribeRequestProcessed(): Boolean = { def isSubscribeRequestProcessed: Boolean = {
!subscriptionChanged !subscriptionChanged
} }
@ -308,9 +433,12 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
subscriptionChanged = false subscriptionChanged = false
} }
try { try {
consumer.poll(Duration.ofMillis(50)) receivedMessages += consumer.poll(Duration.ofMillis(50)).count()
} catch { } catch {
case _: WakeupException => // ignore for shutdown case _: WakeupException => // ignore for shutdown
case e: Throwable =>
thrownException = Some(e)
throw e
} }
} }
} }

View File

@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
abstract class BaseQuotaTest extends IntegrationTestHarness { abstract class BaseQuotaTest extends IntegrationTestHarness {
override val serverCount = 2 override val brokerCount = 2
protected def producerClientId = "QuotasTestProducer-1" protected def producerClientId = "QuotasTestProducer-1"
protected def consumerClientId = "QuotasTestConsumer-1" protected def consumerClientId = "QuotasTestConsumer-1"
@ -70,7 +70,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
super.setUp() super.setUp()
val numPartitions = 1 val numPartitions = 1
val leaders = createTopic(topic1, numPartitions, serverCount) val leaders = createTopic(topic1, numPartitions, brokerCount)
leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1) leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1)
followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1) followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1)
quotaTestClients = createQuotaTestClients(topic1, leaderNode) quotaTestClients = createQuotaTestClients(topic1, leaderNode)

View File

@ -15,12 +15,10 @@ package kafka.api
import java.time import java.time
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collection, Collections, Properties} import java.util.{Collection, Collections, Properties}
import util.control.Breaks._ import util.control.Breaks._
import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, Logging, ShutdownableThread, TestUtils} import kafka.utils.{CoreUtils, Logging, ShutdownableThread, TestUtils}
import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@ -29,25 +27,23 @@ import org.apache.kafka.common.errors.GroupMaxSizeReachedException
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse} import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{After, Before, Ignore, Test} import org.junit.{After, Ignore, Test}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future => SFuture}
/** /**
* Integration tests for the consumer that cover basic usage as well as server failures * Integration tests for the consumer that cover basic usage as well as server failures
*/ */
class ConsumerBounceTest extends BaseRequestTest with Logging { class ConsumerBounceTest extends BaseConsumerTest with Logging {
val topic = "topic"
val part = 0
val tp = new TopicPartition(topic, part)
val maxGroupSize = 5 val maxGroupSize = 5
// Time to process commit and leave group requests in tests when brokers are available // Time to process commit and leave group requests in tests when brokers are available
val gracefulCloseTimeMs = 1000 val gracefulCloseTimeMs = Some(1000L)
val executor = Executors.newScheduledThreadPool(2) val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
val consumerPollers: mutable.Buffer[ConsumerAssignmentPoller] = mutable.Buffer[ConsumerAssignmentPoller]()
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
override def generateConfigs = { override def generateConfigs = {
generateKafkaConfigs() generateKafkaConfigs()
@ -63,21 +59,14 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true") properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false") properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
FixedPortTestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false) FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, properties)) .map(KafkaConfig.fromProps(_, properties))
} }
@Before
override def setUp() {
super.setUp()
// create the test topic with all the brokers as replicas
createTopic(topic, 1, numBrokers)
}
@After @After
override def tearDown() { override def tearDown() {
try { try {
consumerPollers.foreach(_.shutdown())
executor.shutdownNow() executor.shutdownNow()
// Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread // Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread
assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
@ -176,7 +165,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
val consumer = createConsumer() val consumer = createConsumer()
consumer.subscribe(Collections.singleton(newtopic)) consumer.subscribe(Collections.singleton(newtopic))
executor.schedule(new Runnable { executor.schedule(new Runnable {
def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor = numBrokers) def run() = createTopic(newtopic, numPartitions = brokerCount, replicationFactor = brokerCount)
}, 2, TimeUnit.SECONDS) }, 2, TimeUnit.SECONDS)
consumer.poll(time.Duration.ZERO) consumer.poll(time.Duration.ZERO)
@ -201,18 +190,22 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
assertEquals(0, remainingRecords) assertEquals(0, remainingRecords)
} }
val poller = new ConsumerAssignmentPoller(consumer, List(newtopic))
consumerPollers += poller
poller.start()
sendRecords(numRecords, newtopic) sendRecords(numRecords, newtopic)
receiveRecords(consumer, numRecords, 10000) receiveExactRecords(poller, numRecords, 10000)
poller.shutdown()
servers.foreach(server => killBroker(server.config.brokerId)) servers.foreach(server => killBroker(server.config.brokerId))
Thread.sleep(500) Thread.sleep(500)
restartDeadBrokers() restartDeadBrokers()
val future = executor.submit(new Runnable { val poller2 = new ConsumerAssignmentPoller(consumer, List(newtopic))
def run() = receiveRecords(consumer, numRecords, 10000) consumerPollers += poller2
}) poller2.start()
sendRecords(numRecords, newtopic) sendRecords(numRecords, newtopic)
future.get receiveExactRecords(poller, numRecords, 10000L)
} }
@Test @Test
@ -233,7 +226,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
*/ */
private def checkCloseGoodPath(numRecords: Int, groupId: String) { private def checkCloseGoodPath(numRecords: Int, groupId: String) {
val consumer = createConsumerAndReceive(groupId, false, numRecords) val consumer = createConsumerAndReceive(groupId, false, numRecords)
val future = submitCloseAndValidate(consumer, Long.MaxValue, None, Some(gracefulCloseTimeMs)) val future = submitCloseAndValidate(consumer, Long.MaxValue, None, gracefulCloseTimeMs)
future.get future.get
checkClosedState(groupId, numRecords) checkClosedState(groupId, numRecords)
} }
@ -251,8 +244,8 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
killBroker(findCoordinator(dynamicGroup)) killBroker(findCoordinator(dynamicGroup))
killBroker(findCoordinator(manualGroup)) killBroker(findCoordinator(manualGroup))
val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs)) val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs)
val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(gracefulCloseTimeMs)) val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, gracefulCloseTimeMs)
future1.get future1.get
future2.get future2.get
@ -302,6 +295,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
*/ */
@Test @Test
def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = { def testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit = {
val group = "group-max-size-test"
val topic = "group-max-size-test" val topic = "group-max-size-test"
val maxGroupSize = 2 val maxGroupSize = 2
val consumerCount = maxGroupSize + 1 val consumerCount = maxGroupSize + 1
@ -311,77 +305,51 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
// ensure even record distribution per partition // ensure even record distribution per partition
recordsProduced += partitionCount - recordsProduced % partitionCount recordsProduced += partitionCount - recordsProduced % partitionCount
} }
val executor = Executors.newScheduledThreadPool(consumerCount * 2)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val producer = createProducer() val partitions = createTopicPartitions(topic, numPartitions = partitionCount, replicationFactor = brokerCount)
createTopic(topic, numPartitions = partitionCount, replicationFactor = numBrokers)
val stableConsumers = createConsumersWithGroupId("group2", consumerCount, executor, topic = topic)
// assert group is stable and working addConsumersToGroupAndWaitForGroupAssignment(consumerCount, mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](),
sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount)) consumerPollers, List[String](topic), partitions, group)
stableConsumers.foreach { cons => {
receiveAndCommit(cons, recordsProduced / consumerCount, 10000)
}}
// roll all brokers with a lesser max group size to make sure coordinator has the new config // roll all brokers with a lesser max group size to make sure coordinator has the new config
val newConfigs = generateKafkaConfigs(maxGroupSize.toString) val newConfigs = generateKafkaConfigs(maxGroupSize.toString)
val kickedConsumerOut = new AtomicBoolean(false)
var kickedOutConsumerIdx: Option[Int] = None var kickedOutConsumerIdx: Option[Int] = None
val lock = new ReentrantLock
// restart brokers until the group moves to a Coordinator with the new config // restart brokers until the group moves to a Coordinator with the new config
breakable { for (broker <- servers.indices) { breakable { for (broker <- servers.indices) {
killBroker(broker) killBroker(broker)
sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount)) consumerPollers.indices.foreach(idx => {
consumerPollers(idx).thrownException match {
var successfulConsumes = 0 case Some(thrownException) =>
if (!thrownException.isInstanceOf[GroupMaxSizeReachedException]) {
// compute consumptions in a non-blocking way in order to account for the rebalance once the group.size takes effect throw thrownException
val consumeFutures = new ArrayBuffer[SFuture[Any]]
implicit val executorContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
stableConsumers.indices.foreach(idx => {
val currentConsumer = stableConsumers(idx)
val consumeFuture = SFuture {
try {
receiveAndCommit(currentConsumer, recordsProduced / consumerCount, 10000)
CoreUtils.inLock(lock) { successfulConsumes += 1 }
} catch {
case e: Throwable =>
if (!e.isInstanceOf[GroupMaxSizeReachedException]) {
throw e
} }
if (!kickedConsumerOut.compareAndSet(false, true)) { if (kickedOutConsumerIdx.isDefined) {
fail(s"Received more than one ${classOf[GroupMaxSizeReachedException]}") fail(s"Received more than one ${classOf[GroupMaxSizeReachedException]}")
} }
kickedOutConsumerIdx = Some(idx) kickedOutConsumerIdx = Some(idx)
case None =>
} }
}
consumeFutures += consumeFuture
}) })
Await.result(SFuture.sequence(consumeFutures), Duration("12sec"))
if (kickedConsumerOut.get()) { if (kickedOutConsumerIdx.isDefined)
// validate the rest N-1 consumers consumed successfully
assertEquals(maxGroupSize, successfulConsumes)
break break
}
val config = newConfigs(broker) val config = newConfigs(broker)
servers(broker) = TestUtils.createServer(config, time = brokerTime(config.brokerId)) servers(broker) = TestUtils.createServer(config, time = brokerTime(config.brokerId))
restartDeadBrokers() restartDeadBrokers()
}} }}
if (!kickedConsumerOut.get()) if (kickedOutConsumerIdx.isEmpty)
fail(s"Should have received an ${classOf[GroupMaxSizeReachedException]} during the cluster roll") fail(s"Should have received an ${classOf[GroupMaxSizeReachedException]} during the cluster roll")
restartDeadBrokers()
// assert that the group has gone through a rebalance and shed off one consumer // assert that the group has gone through a rebalance and shed off one consumer
stableConsumers.remove(kickedOutConsumerIdx.get) consumerPollers.remove(kickedOutConsumerIdx.get).shutdown()
sendRecords(producer, recordsProduced, topic, numPartitions = Some(partitionCount)) sendRecords(createProducer(), recordsProduced, topic, numPartitions = Some(partitionCount))
// should be only maxGroupSize consumers left in the group TestUtils.waitUntilTrue(() => {
stableConsumers.foreach { cons => { consumerPollers.forall(p => p.receivedMessages >= recordsProduced / consumerCount)
receiveAndCommit(cons, recordsProduced / maxGroupSize, 10000) }, "The remaining consumers in the group could not fetch the expected records", 10000L)
}}
} }
/** /**
@ -389,70 +357,30 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
*/ */
@Test @Test
def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(): Unit = { def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(): Unit = {
val topic = "group-max-size-test" val group = "fatal-exception-test"
val groupId = "group1" val topic = "fatal-exception-test"
val executor = Executors.newScheduledThreadPool(maxGroupSize * 2)
createTopic(topic, maxGroupSize, numBrokers)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val partitions = createTopicPartitions(topic, numPartitions = maxGroupSize, replicationFactor = brokerCount)
// Create N+1 consumers in the same consumer group and assert that the N+1th consumer receives a fatal error when it tries to join the group // Create N+1 consumers in the same consumer group and assert that the N+1th consumer receives a fatal error when it tries to join the group
val stableConsumers = createConsumersWithGroupId(groupId, maxGroupSize, executor, topic) addConsumersToGroupAndWaitForGroupAssignment(maxGroupSize, mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](),
val newConsumer = createConsumerWithGroupId(groupId) consumerPollers, List[String](topic), partitions, group)
var failedRebalance = false val (_, rejectedConsumerPollers) = addConsumersToGroup(1,
var exception: Exception = null mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](), mutable.Buffer[ConsumerAssignmentPoller](), List[String](topic), partitions, group)
waitForRebalance(5000, subscribeAndPoll(newConsumer, executor = executor, onException = e => {failedRebalance = true; exception = e}), val rejectedConsumer = rejectedConsumerPollers.head
executor = executor, stableConsumers:_*) TestUtils.waitUntilTrue(() => {
assertTrue("Rebalance did not fail as expected", failedRebalance) rejectedConsumer.thrownException.isDefined
assertTrue(exception.isInstanceOf[GroupMaxSizeReachedException]) }, "Extra consumer did not throw an exception")
assertTrue(rejectedConsumer.thrownException.get.isInstanceOf[GroupMaxSizeReachedException])
// assert group continues to live // assert group continues to live
val producer = createProducer() sendRecords(createProducer(), maxGroupSize * 100, topic, numPartitions = Some(partitions.size))
sendRecords(producer, maxGroupSize * 100, topic, numPartitions = Some(maxGroupSize)) TestUtils.waitUntilTrue(() => {
stableConsumers.foreach { cons => { consumerPollers.forall(p => p.receivedMessages >= 100)
receiveExactRecords(cons, 100, 10000) }, "The consumers in the group could not fetch the expected records", 10000L)
}}
}
/**
* Creates N consumers with the same group ID and ensures the group rebalances properly at each step
*/
private def createConsumersWithGroupId(groupId: String, consumerCount: Int, executor: ExecutorService, topic: String): ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]] = {
val stableConsumers = ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
for (_ <- 1.to(consumerCount)) {
val newConsumer = createConsumerWithGroupId(groupId)
waitForRebalance(5000, subscribeAndPoll(newConsumer, executor = executor, topic = topic),
executor = executor, stableConsumers:_*)
stableConsumers += newConsumer
}
stableConsumers
}
def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], executor: ExecutorService, revokeSemaphore: Option[Semaphore] = None,
onException: Exception => Unit = e => { throw e }, topic: String = topic, pollTimeout: Int = 1000): Future[Any] = {
executor.submit(CoreUtils.runnable {
try {
consumer.subscribe(Collections.singletonList(topic))
consumer.poll(java.time.Duration.ofMillis(pollTimeout))
} catch {
case e: Exception => onException.apply(e)
}
}, 0)
}
def waitForRebalance(timeoutMs: Long, future: Future[Any], executor: ExecutorService, otherConsumers: KafkaConsumer[Array[Byte], Array[Byte]]*) {
val startMs = System.currentTimeMillis
implicit val executorContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
while (System.currentTimeMillis < startMs + timeoutMs && !future.isDone) {
val consumeFutures = otherConsumers.map(consumer => SFuture {
consumer.poll(time.Duration.ofMillis(1000))
})
Await.result(SFuture.sequence(consumeFutures), Duration("1500ms"))
}
assertTrue("Rebalance did not complete in time", future.isDone)
} }
/** /**
@ -463,7 +391,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
@Test @Test
def testCloseDuringRebalance() { def testCloseDuringRebalance() {
val topic = "closetest" val topic = "closetest"
createTopic(topic, 10, numBrokers) createTopic(topic, 10, brokerCount)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@ -510,7 +438,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
val rebalanceFuture = createConsumerToRebalance() val rebalanceFuture = createConsumerToRebalance()
// consumer1 should leave group and close immediately even though rebalance is in progress // consumer1 should leave group and close immediately even though rebalance is in progress
val closeFuture1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs)) val closeFuture1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs)
// Rebalance should complete without waiting for consumer1 to timeout since consumer1 has left the group // Rebalance should complete without waiting for consumer1 to timeout since consumer1 has left the group
waitForRebalance(2000, rebalanceFuture, consumer2) waitForRebalance(2000, rebalanceFuture, consumer2)
@ -528,40 +456,22 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
closeFuture2.get(2000, TimeUnit.MILLISECONDS) closeFuture2.get(2000, TimeUnit.MILLISECONDS)
} }
private def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = {
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
createConsumer()
}
private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, numRecords: Int): KafkaConsumer[Array[Byte], Array[Byte]] = { private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, numRecords: Int): KafkaConsumer[Array[Byte], Array[Byte]] = {
val consumer = createConsumerWithGroupId(groupId) val consumer = createConsumerWithGroupId(groupId)
if (manualAssign) val consumerPoller = if (manualAssign)
consumer.assign(Collections.singleton(tp)) subscribeConsumerAndStartPolling(consumer, List(), Set(tp))
else else
consumer.subscribe(Collections.singleton(topic)) subscribeConsumerAndStartPolling(consumer, List(topic))
receiveExactRecords(consumer, numRecords)
receiveExactRecords(consumerPoller, numRecords)
consumerPoller.shutdown()
consumer consumer
} }
private def receiveRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long = 60000): Long = { private def receiveExactRecords(consumer: ConsumerAssignmentPoller, numRecords: Int, timeoutMs: Long = 60000): Unit = {
var received = 0L TestUtils.waitUntilTrue(() => {
val endTimeMs = System.currentTimeMillis + timeoutMs consumer.receivedMessages == numRecords
while (received < numRecords && System.currentTimeMillis < endTimeMs) }, s"Consumer did not receive expected $numRecords. It received ${consumer.receivedMessages}", timeoutMs)
received += consumer.poll(time.Duration.ofMillis(100)).count()
received
}
private def receiveExactRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long = 60000): Unit = {
val received = receiveRecords(consumer, numRecords, timeoutMs)
assertEquals(numRecords, received)
}
@throws(classOf[CommitFailedException])
private def receiveAndCommit(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, timeoutMs: Long): Unit = {
val received = receiveRecords(consumer, numRecords, timeoutMs)
assertTrue(s"Received $received, expected at least $numRecords", numRecords <= received)
consumer.commitSync()
} }
private def submitCloseAndValidate(consumer: KafkaConsumer[Array[Byte], Array[Byte]], private def submitCloseAndValidate(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
@ -617,6 +527,12 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
} }
} }
private def createTopicPartitions(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1,
topicConfig: Properties = new Properties): Set[TopicPartition] = {
createTopic(topic, numPartitions = numPartitions, replicationFactor = replicationFactor, topicConfig = topicConfig)
Range(0, numPartitions).map(part => new TopicPartition(topic, part)).toSet
}
private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
numRecords: Int, numRecords: Int,
topic: String = this.topic, topic: String = this.topic,

View File

@ -48,7 +48,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
override protected def interBrokerListenerName: ListenerName = new ListenerName("BROKER") override protected def interBrokerListenerName: ListenerName = new ListenerName("BROKER")
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
override val serverCount: Int = 2 override val brokerCount: Int = 2
private val kafkaServerSaslMechanisms = Seq("SCRAM-SHA-256") private val kafkaServerSaslMechanisms = Seq("SCRAM-SHA-256")
private val kafkaClientSaslMechanism = "SCRAM-SHA-256" private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
@ -161,7 +161,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota) user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota)
user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true) user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true)
assertEquals(serverCount, callbackInstances.get) assertEquals(brokerCount, callbackInstances.get)
} }
/** /**

View File

@ -30,7 +30,7 @@ import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup { class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup {
override val serverCount = 1 override val brokerCount = 1
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName) this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)

View File

@ -58,7 +58,7 @@ import scala.collection.JavaConverters._
* would end up with ZooKeeperTestHarness twice. * would end up with ZooKeeperTestHarness twice.
*/ */
abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
override val serverCount = 3 override val brokerCount = 3
override def configureSecurityBeforeServersStart() { override def configureSecurityBeforeServersStart() {
AclCommand.main(clusterActionArgs) AclCommand.main(clusterActionArgs)

View File

@ -33,9 +33,9 @@ class GroupAuthorizerIntegrationTest extends AuthorizerIntegrationTest {
override val kafkaPrincipalType = GroupPrincipalType override val kafkaPrincipalType = GroupPrincipalType
override def userPrincipal = TestGroupPrincipal override def userPrincipal = TestGroupPrincipal
override def propertyOverrides(properties: Properties): Unit = { override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, properties.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[GroupPrincipalBuilder].getName) classOf[GroupPrincipalBuilder].getName)
super.propertyOverrides(properties) super.brokerPropertyOverrides(properties)
} }
} }

View File

@ -37,7 +37,7 @@ import scala.collection.mutable
* A helper class for writing integration tests that involve producers, consumers, and servers * A helper class for writing integration tests that involve producers, consumers, and servers
*/ */
abstract class IntegrationTestHarness extends KafkaServerTestHarness { abstract class IntegrationTestHarness extends KafkaServerTestHarness {
protected def serverCount: Int protected def brokerCount: Int
protected def logDirCount: Int = 1 protected def logDirCount: Int = 1
val producerConfig = new Properties val producerConfig = new Properties
@ -49,10 +49,20 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
protected def interBrokerListenerName: ListenerName = listenerName protected def interBrokerListenerName: ListenerName = listenerName
protected def modifyConfigs(props: Seq[Properties]): Unit = {
configureListeners(props)
props.foreach(_ ++= serverConfig)
}
override def generateConfigs: Seq[KafkaConfig] = { override def generateConfigs: Seq[KafkaConfig] = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
cfgs.foreach { config => modifyConfigs(cfgs)
cfgs.map(KafkaConfig.fromProps)
}
protected def configureListeners(props: Seq[Properties]): Unit = {
props.foreach { config =>
config.remove(KafkaConfig.InterBrokerSecurityProtocolProp) config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
config.setProperty(KafkaConfig.InterBrokerListenerNameProp, interBrokerListenerName.value) config.setProperty(KafkaConfig.InterBrokerListenerNameProp, interBrokerListenerName.value)
@ -63,8 +73,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
config.setProperty(KafkaConfig.ListenersProp, listeners) config.setProperty(KafkaConfig.ListenersProp, listeners)
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityMap) config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityMap)
} }
cfgs.foreach(_ ++= serverConfig)
cfgs.map(KafkaConfig.fromProps)
} }
@Before @Before

View File

@ -41,7 +41,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
val producerCount = 1 val producerCount = 1
val consumerCount = 2 val consumerCount = 2
val serverCount = 3 val brokerCount = 3
val groupId = "my-test" val groupId = "my-test"
val clientId = "consumer-498" val clientId = "consumer-498"
@ -70,7 +70,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
override def setUp() { override def setUp() {
super.setUp() super.setUp()
client = AdminClient.createSimplePlaintext(this.brokerList) client = AdminClient.createSimplePlaintext(this.brokerList)
createTopic(topic, 2, serverCount) createTopic(topic, 2, brokerCount)
} }
@After @After

View File

@ -33,7 +33,7 @@ import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue}
class LogAppendTimeTest extends IntegrationTestHarness { class LogAppendTimeTest extends IntegrationTestHarness {
val producerCount: Int = 1 val producerCount: Int = 1
val consumerCount: Int = 1 val consumerCount: Int = 1
val serverCount: Int = 2 val brokerCount: Int = 2
// This will be used for the offsets topic as well // This will be used for the offsets topic as well
serverConfig.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.name) serverConfig.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.name)

View File

@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
class MetricsTest extends IntegrationTestHarness with SaslSetup { class MetricsTest extends IntegrationTestHarness with SaslSetup {
override val serverCount = 1 override val brokerCount = 1
override protected def listenerName = new ListenerName("CLIENT") override protected def listenerName = new ListenerName("CLIENT")
private val kafkaClientSaslMechanism = "PLAIN" private val kafkaClientSaslMechanism = "PLAIN"

View File

@ -19,7 +19,6 @@ import java.util.regex.Pattern
import java.util.{Collections, Locale, Optional, Properties} import java.util.{Collections, Locale, Optional, Properties}
import kafka.log.LogConfig import kafka.log.LogConfig
import kafka.server.KafkaConfig
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
@ -38,6 +37,8 @@ import scala.collection.mutable.Buffer
import kafka.server.QuotaType import kafka.server.QuotaType
import kafka.server.KafkaServer import kafka.server.KafkaServer
import scala.collection.mutable
/* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */ /* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
class PlaintextConsumerTest extends BaseConsumerTest { class PlaintextConsumerTest extends BaseConsumerTest {
@ -342,17 +343,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
sendRecords(producer, numRecords, tp) sendRecords(producer, numRecords, tp)
val topic1 = "tblablac" // matches subscribed pattern val topic1 = "tblablac" // matches subscribed pattern
createTopic(topic1, 2, serverCount) createTopic(topic1, 2, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
val topic2 = "tblablak" // does not match subscribed pattern val topic2 = "tblablak" // does not match subscribed pattern
createTopic(topic2, 2, serverCount) createTopic(topic2, 2, brokerCount)
sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0)) sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0))
sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1))
val topic3 = "tblab1" // does not match subscribed pattern val topic3 = "tblab1" // does not match subscribed pattern
createTopic(topic3, 2, serverCount) createTopic(topic3, 2, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0))
sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1))
@ -370,7 +371,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
awaitAssignment(consumer, assignment) awaitAssignment(consumer, assignment)
val topic4 = "tsomec" // matches subscribed pattern val topic4 = "tsomec" // matches subscribed pattern
createTopic(topic4, 2, serverCount) createTopic(topic4, 2, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0))
sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1))
@ -404,7 +405,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// the first topic ('topic') matches first subscription pattern only // the first topic ('topic') matches first subscription pattern only
val fooTopic = "foo" // matches both subscription patterns val fooTopic = "foo" // matches both subscription patterns
createTopic(fooTopic, 1, serverCount) createTopic(fooTopic, 1, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0)) sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0))
assertEquals(0, consumer.assignment().size) assertEquals(0, consumer.assignment().size)
@ -419,7 +420,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
awaitAssignment(consumer, assignment) awaitAssignment(consumer, assignment)
val barTopic = "bar" // matches the next subscription pattern val barTopic = "bar" // matches the next subscription pattern
createTopic(barTopic, 1, serverCount) createTopic(barTopic, 1, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(barTopic, 0)) sendRecords(producer, numRecords = 1000, new TopicPartition(barTopic, 0))
val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this
@ -450,7 +451,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
sendRecords(producer, numRecords, tp) sendRecords(producer, numRecords, tp)
val topic1 = "tblablac" // matches the subscription pattern val topic1 = "tblablac" // matches the subscription pattern
createTopic(topic1, 2, serverCount) createTopic(topic1, 2, brokerCount)
sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1)) sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
@ -517,7 +518,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer.subscribe(List(topic).asJava) consumer.subscribe(List(topic).asJava)
awaitAssignment(consumer, initialAssignment) awaitAssignment(consumer, initialAssignment)
createTopic(otherTopic, 2, serverCount) createTopic(otherTopic, 2, brokerCount)
val expandedAssignment = initialAssignment ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1)) val expandedAssignment = initialAssignment ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
consumer.subscribe(List(topic, otherTopic).asJava) consumer.subscribe(List(topic, otherTopic).asJava)
awaitAssignment(consumer, expandedAssignment) awaitAssignment(consumer, expandedAssignment)
@ -526,7 +527,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test @Test
def testShrinkingTopicSubscriptions() { def testShrinkingTopicSubscriptions() {
val otherTopic = "other" val otherTopic = "other"
createTopic(otherTopic, 2, serverCount) createTopic(otherTopic, 2, brokerCount)
val initialAssignment = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1)) val initialAssignment = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
val consumer = createConsumer() val consumer = createConsumer()
consumer.subscribe(List(topic, otherTopic).asJava) consumer.subscribe(List(topic, otherTopic).asJava)
@ -781,7 +782,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val partitionCount = 30 val partitionCount = 30
val topics = Seq(topic1, topic2, topic3) val topics = Seq(topic1, topic2, topic3)
topics.foreach { topicName => topics.foreach { topicName =>
createTopic(topicName, partitionCount, serverCount) createTopic(topicName, partitionCount, brokerCount)
} }
val partitions = topics.flatMap { topic => val partitions = topics.flatMap { topic =>
@ -861,11 +862,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// for the topic partition assignment // for the topic partition assignment
val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions) val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
try { try {
validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") validateGroupAssignment(consumerPollers, subscriptions)
// add one more consumer and validate re-assignment // add one more consumer and validate re-assignment
addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers, addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers,
List(topic1, topic2), subscriptions) List(topic1, topic2), subscriptions, "roundrobin-group")
} finally { } finally {
consumerPollers.foreach(_.shutdown()) consumerPollers.foreach(_.shutdown())
} }
@ -900,11 +901,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// create a group of consumers, subscribe the consumers to the single topic and start polling // create a group of consumers, subscribe the consumers to the single topic and start polling
// for the topic partition assignment // for the topic partition assignment
val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(9, List(topic), partitions) val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(9, List(topic), partitions)
validateGroupAssignment(consumerPollers, partitions, s"Did not get valid initial assignment for partitions ${partitions.asJava}") validateGroupAssignment(consumerPollers, partitions)
val prePartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap) val prePartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap)
// add one more consumer and validate re-assignment // add one more consumer and validate re-assignment
addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers, List(topic), partitions) addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers, List(topic), partitions, "sticky-group")
val postPartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap) val postPartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap)
val keys = prePartition2PollerId.keySet.union(postPartition2PollerId.keySet) val keys = prePartition2PollerId.keySet.union(postPartition2PollerId.keySet)
@ -945,7 +946,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val consumerPollers = subscribeConsumers(consumersInGroup, List(topic, topic1)) val consumerPollers = subscribeConsumers(consumersInGroup, List(topic, topic1))
try { try {
validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") validateGroupAssignment(consumerPollers, subscriptions)
// add 2 more consumers and validate re-assignment // add 2 more consumers and validate re-assignment
addConsumersToGroupAndWaitForGroupAssignment(2, consumersInGroup, consumerPollers, List(topic, topic1), subscriptions) addConsumersToGroupAndWaitForGroupAssignment(2, consumersInGroup, consumerPollers, List(topic, topic1), subscriptions)
@ -1040,7 +1041,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test @Test
def testAutoCommitIntercept() { def testAutoCommitIntercept() {
val topic2 = "topic2" val topic2 = "topic2"
createTopic(topic2, 2, serverCount) createTopic(topic2, 2, brokerCount)
// produce records // produce records
val numRecords = 100 val numRecords = 100
@ -1336,7 +1337,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test @Test
def testAutoCommitOnRebalance() { def testAutoCommitOnRebalance() {
val topic2 = "topic2" val topic2 = "topic2"
createTopic(topic2, 2, serverCount) createTopic(topic2, 2, brokerCount)
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val consumer = createConsumer() val consumer = createConsumer()
@ -1376,7 +1377,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testPerPartitionLeadMetricsCleanUpWithSubscribe() { def testPerPartitionLeadMetricsCleanUpWithSubscribe() {
val numMessages = 1000 val numMessages = 1000
val topic2 = "topic2" val topic2 = "topic2"
createTopic(topic2, 2, serverCount) createTopic(topic2, 2, brokerCount)
// send some messages. // send some messages.
val producer = createProducer() val producer = createProducer()
sendRecords(producer, numMessages, tp) sendRecords(producer, numMessages, tp)
@ -1415,7 +1416,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testPerPartitionLagMetricsCleanUpWithSubscribe() { def testPerPartitionLagMetricsCleanUpWithSubscribe() {
val numMessages = 1000 val numMessages = 1000
val topic2 = "topic2" val topic2 = "topic2"
createTopic(topic2, 2, serverCount) createTopic(topic2, 2, brokerCount)
// send some messages. // send some messages.
val producer = createProducer() val producer = createProducer()
sendRecords(producer, numMessages, tp) sendRecords(producer, numMessages, tp)
@ -1635,57 +1636,21 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumerPollers += timeoutPoller consumerPollers += timeoutPoller
// validate the initial assignment // validate the initial assignment
validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") validateGroupAssignment(consumerPollers, subscriptions)
// stop polling and close one of the consumers, should trigger partition re-assignment among alive consumers // stop polling and close one of the consumers, should trigger partition re-assignment among alive consumers
timeoutPoller.shutdown() timeoutPoller.shutdown()
if (closeConsumer) if (closeConsumer)
timeoutConsumer.close() timeoutConsumer.close()
val maxSessionTimeout = this.serverConfig.getProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp).toLong
validateGroupAssignment(consumerPollers, subscriptions, validateGroupAssignment(consumerPollers, subscriptions,
s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left", 3 * maxSessionTimeout) Some(s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left"), 3 * groupMaxSessionTimeoutMs)
// done with pollers and consumers // done with pollers and consumers
for (poller <- consumerPollers) for (poller <- consumerPollers)
poller.shutdown() poller.shutdown()
} }
/**
* Creates topic 'topicName' with 'numPartitions' partitions and produces 'recordsPerPartition'
* records to each partition
*/
def createTopicAndSendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
topicName: String,
numPartitions: Int,
recordsPerPartition: Int): Set[TopicPartition] = {
createTopic(topicName, numPartitions, serverCount)
var parts = Set[TopicPartition]()
for (partition <- 0 until numPartitions) {
val tp = new TopicPartition(topicName, partition)
sendRecords(producer, recordsPerPartition, tp)
parts = parts + tp
}
parts
}
/**
* Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates
* consumer poller and starts polling.
* Assumes that the consumer is not subscribed to any topics yet
*
* @param consumer consumer
* @param topicsToSubscribe topics that this consumer will subscribe to
* @return consumer poller for the given consumer
*/
def subscribeConsumerAndStartPolling(consumer: Consumer[Array[Byte], Array[Byte]],
topicsToSubscribe: List[String]): ConsumerAssignmentPoller = {
assertEquals(0, consumer.assignment().size)
val consumerPoller = new ConsumerAssignmentPoller(consumer, topicsToSubscribe)
consumerPoller.start()
consumerPoller
}
/** /**
* Creates consumer pollers corresponding to a given consumer group, one per consumer; subscribes consumers to * Creates consumer pollers corresponding to a given consumer group, one per consumer; subscribes consumers to
* 'topicsToSubscribe' topics, waits until consumers get topics assignment. * 'topicsToSubscribe' topics, waits until consumers get topics assignment.
@ -1696,14 +1661,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
* @param topicsToSubscribe topics to which consumers will subscribe to * @param topicsToSubscribe topics to which consumers will subscribe to
* @return collection of consumer pollers * @return collection of consumer pollers
*/ */
def subscribeConsumers(consumerGroup: Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], def subscribeConsumers(consumerGroup: mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
topicsToSubscribe: List[String]): Buffer[ConsumerAssignmentPoller] = { topicsToSubscribe: List[String]): mutable.Buffer[ConsumerAssignmentPoller] = {
val consumerPollers = Buffer[ConsumerAssignmentPoller]() val consumerPollers = mutable.Buffer[ConsumerAssignmentPoller]()
for (consumer <- consumerGroup) for (consumer <- consumerGroup)
consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe) consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
consumerPollers consumerPollers
} }
/**
* Creates topic 'topicName' with 'numPartitions' partitions and produces 'recordsPerPartition'
* records to each partition
*/
def createTopicAndSendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
topicName: String,
numPartitions: Int,
recordsPerPartition: Int): Set[TopicPartition] = {
createTopic(topicName, numPartitions, brokerCount)
var parts = Set[TopicPartition]()
for (partition <- 0 until numPartitions) {
val tp = new TopicPartition(topicName, partition)
sendRecords(producer, recordsPerPartition, tp)
parts = parts + tp
}
parts
}
/** /**
* Creates 'consumerCount' consumers and consumer pollers, one per consumer; subscribes consumers to * Creates 'consumerCount' consumers and consumer pollers, one per consumer; subscribes consumers to
* 'topicsToSubscribe' topics, waits until consumers get topics assignment. * 'topicsToSubscribe' topics, waits until consumers get topics assignment.
@ -1728,54 +1711,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
(consumerGroup, consumerPollers) (consumerGroup, consumerPollers)
} }
/**
* Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding
* pollers for these consumers. Wait for partition re-assignment and validate.
*
* Currently, assignment validation requires that total number of partitions is greater or equal to
* number of consumers, so subscriptions.size must be greater or equal the resulting number of consumers in the group
*
* @param numOfConsumersToAdd number of consumers to create and add to the consumer group
* @param consumerGroup current consumer group
* @param consumerPollers current consumer pollers
* @param topicsToSubscribe topics to which new consumers will subscribe to
* @param subscriptions set of all topic partitions
*/
def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int,
consumerGroup: Buffer[KafkaConsumer[Array[Byte], Array[Byte]]],
consumerPollers: Buffer[ConsumerAssignmentPoller],
topicsToSubscribe: List[String],
subscriptions: Set[TopicPartition]): Unit = {
assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
for (_ <- 0 until numOfConsumersToAdd) {
val consumer = createConsumer()
consumerGroup += consumer
consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
}
// wait until topics get re-assigned and validate assignment
validateGroupAssignment(consumerPollers, subscriptions,
s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added $numOfConsumersToAdd consumer(s)")
}
/**
* Wait for consumers to get partition assignment and validate it.
*
* @param consumerPollers consumer pollers corresponding to the consumer group we are testing
* @param subscriptions set of all topic partitions
* @param msg message to print when waiting for/validating assignment fails
*/
def validateGroupAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller],
subscriptions: Set[TopicPartition],
msg: String,
waitTime: Long = 10000L): Unit = {
TestUtils.waitUntilTrue(() => {
val assignments = Buffer[Set[TopicPartition]]()
consumerPollers.foreach(assignments += _.consumerAssignment())
isPartitionAssignmentValid(assignments, subscriptions)
}, msg, waitTime)
}
def changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller], def changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller],
topicsToSubscribe: List[String], topicsToSubscribe: List[String],
subscriptions: Set[TopicPartition]): Unit = { subscriptions: Set[TopicPartition]): Unit = {
@ -1785,11 +1720,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// since subscribe call to poller does not actually call consumer subscribe right away, wait // since subscribe call to poller does not actually call consumer subscribe right away, wait
// until subscribe is called on all consumers // until subscribe is called on all consumers
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
consumerPollers forall (poller => poller.isSubscribeRequestProcessed()) consumerPollers.forall { poller => poller.isSubscribeRequestProcessed }
}, s"Failed to call subscribe on all consumers in the group for subscription ${subscriptions}", 1000L) }, s"Failed to call subscribe on all consumers in the group for subscription $subscriptions", 1000L)
validateGroupAssignment(consumerPollers, subscriptions, validateGroupAssignment(consumerPollers, subscriptions,
s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription") Some(s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription"))
} }
def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: Consumer[K, V], def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: Consumer[K, V],

View File

@ -39,7 +39,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
val consumerCount = 1 val consumerCount = 1
val producerCount = 1 val producerCount = 1
val serverCount = 1 val brokerCount = 1
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
this.serverConfig.setProperty(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") this.serverConfig.setProperty(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
@ -62,7 +62,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
JaasTestUtils.KafkaServerContextName)) JaasTestUtils.KafkaServerContextName))
super.setUp() super.setUp()
createTopic(topic, numPartitions, serverCount) createTopic(topic, numPartitions, brokerCount)
} }
@After @After

View File

@ -13,7 +13,7 @@
package kafka.api package kafka.api
import java.io.File import java.io.File
import java.util.Locale import java.util.{Locale, Properties}
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils} import kafka.utils.{JaasTestUtils, TestUtils}
@ -47,6 +47,11 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
closeSasl() closeSasl()
} }
override def modifyConfigs(props: Seq[Properties]): Unit = {
super.modifyConfigs(props)
configureListeners(props)
}
/** /**
* Checks that everyone can access ZkUtils.SecureZkRootPaths and ZkUtils.SensitiveZkRootPaths * Checks that everyone can access ZkUtils.SecureZkRootPaths and ZkUtils.SensitiveZkRootPaths
* when zookeeper.set.acl=false, even if ZooKeeper is SASL-enabled. * when zookeeper.set.acl=false, even if ZooKeeper is SASL-enabled.

View File

@ -39,7 +39,7 @@ import scala.collection.JavaConverters._
class DynamicConnectionQuotaTest extends BaseRequestTest { class DynamicConnectionQuotaTest extends BaseRequestTest {
override def numBrokers = 1 override def brokerCount = 1
val topic = "test" val topic = "test"
val listener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) val listener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
@ -49,7 +49,7 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
@Before @Before
override def setUp(): Unit = { override def setUp(): Unit = {
super.setUp() super.setUp()
TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers) TestUtils.createTopic(zkClient, topic, brokerCount, brokerCount, servers)
} }
@After @After
@ -64,8 +64,8 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
} }
} }
override protected def propertyOverrides(properties: Properties): Unit = { override protected def brokerPropertyOverrides(properties: Properties): Unit = {
super.propertyOverrides(properties) super.brokerPropertyOverrides(properties)
} }
@Test @Test

View File

@ -39,7 +39,7 @@ import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup { class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
override val serverCount = 1 override val brokerCount = 1
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
private val kafkaClientSaslMechanism = "GSSAPI" private val kafkaClientSaslMechanism = "GSSAPI"
@ -70,7 +70,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
clientConfig.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, "5000") clientConfig.put(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, "5000")
// create the test topic with all the brokers as replicas // create the test topic with all the brokers as replicas
createTopic(topic, 2, serverCount) createTopic(topic, 2, brokerCount)
} }
@After @After

View File

@ -36,7 +36,7 @@ import scala.collection.JavaConverters._
*/ */
class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup { class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup {
override val serverCount = 1 override val brokerCount = 1
private val kafkaClientSaslMechanism = "SCRAM-SHA-256" private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
private val kafkaServerSaslMechanisms = Collections.singletonList("SCRAM-SHA-256").asScala private val kafkaServerSaslMechanisms = Collections.singletonList("SCRAM-SHA-256").asScala

View File

@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
class AddPartitionsTest extends BaseRequestTest { class AddPartitionsTest extends BaseRequestTest {
protected override def numBrokers: Int = 4 override def brokerCount: Int = 4
val partitionId = 0 val partitionId = 0

View File

@ -39,7 +39,7 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
var adminClient: org.apache.kafka.clients.admin.AdminClient = null var adminClient: org.apache.kafka.clients.admin.AdminClient = null
override def numBrokers = 1 override def brokerCount = 1
@Before @Before
override def setUp(): Unit = { override def setUp(): Unit = {
@ -48,11 +48,11 @@ class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup {
} }
override def generateConfigs = { override def generateConfigs = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect,
enableControlledShutdown = false, enableControlledShutdown = false,
interBrokerSecurityProtocol = Some(securityProtocol), interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true) trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
props.foreach(propertyOverrides) props.foreach(brokerPropertyOverrides)
props.map(KafkaConfig.fromProps) props.map(KafkaConfig.fromProps)
} }

View File

@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
class AbstractCreateTopicsRequestTest extends BaseRequestTest { class AbstractCreateTopicsRequestTest extends BaseRequestTest {
override def propertyOverrides(properties: Properties): Unit = override def brokerPropertyOverrides(properties: Properties): Unit =
properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
def topicsReq(topics: Seq[CreatableTopic], def topicsReq(topics: Seq[CreatableTopic],

View File

@ -31,7 +31,7 @@ class AddPartitionsToTxnRequestTest extends BaseRequestTest {
private val topic1 = "foobartopic" private val topic1 = "foobartopic"
val numPartitions = 3 val numPartitions = 3
override def propertyOverrides(properties: Properties): Unit = override def brokerPropertyOverrides(properties: Properties): Unit =
properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
@Before @Before

View File

@ -33,7 +33,7 @@ import scala.util.Random
class AlterReplicaLogDirsRequestTest extends BaseRequestTest { class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
override val logDirCount = 5 override val logDirCount = 5
override val numBrokers = 1 override val brokerCount = 1
val topic = "topic" val topic = "topic"

View File

@ -40,7 +40,7 @@ object ApiVersionsRequestTest {
class ApiVersionsRequestTest extends BaseRequestTest { class ApiVersionsRequestTest extends BaseRequestTest {
override def numBrokers: Int = 1 override def brokerCount: Int = 1
@Test @Test
def testApiVersionsRequest() { def testApiVersionsRequest() {

View File

@ -24,7 +24,6 @@ import java.util.Properties
import kafka.api.IntegrationTestHarness import kafka.api.IntegrationTestHarness
import kafka.network.SocketServer import kafka.network.SocketServer
import kafka.utils._
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.types.Struct import org.apache.kafka.common.protocol.types.Struct
import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.protocol.ApiKeys
@ -32,22 +31,19 @@ import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestRespons
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
abstract class BaseRequestTest extends IntegrationTestHarness { abstract class BaseRequestTest extends IntegrationTestHarness {
override val serverCount: Int = numBrokers
private var correlationId = 0 private var correlationId = 0
// If required, set number of brokers // If required, set number of brokers
protected def numBrokers: Int = 3 override def brokerCount: Int = 3
// If required, override properties by mutating the passed Properties object // If required, override properties by mutating the passed Properties object
protected def propertyOverrides(properties: Properties) {} protected def brokerPropertyOverrides(properties: Properties) {}
override def generateConfigs = { override def modifyConfigs(props: Seq[Properties]): Unit = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, props.foreach { p =>
enableControlledShutdown = false, p.put(KafkaConfig.ControlledShutdownEnableProp, "false")
interBrokerSecurityProtocol = Some(securityProtocol), brokerPropertyOverrides(p)
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) }
props.foreach(propertyOverrides)
props.map(KafkaConfig.fromProps)
} }
def anySocketServer = { def anySocketServer = {

View File

@ -55,7 +55,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions", numPartitions = -1))), validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions", numPartitions = -1))),
Map("error-partitions" -> error(Errors.INVALID_PARTITIONS)), checkErrorMessage = false) Map("error-partitions" -> error(Errors.INVALID_PARTITIONS)), checkErrorMessage = false)
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication", validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
replicationFactor = numBrokers + 1))), replicationFactor = brokerCount + 1))),
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR)), checkErrorMessage = false) Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR)), checkErrorMessage = false)
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-config", validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-config",
config=Map("not.a.property" -> "error")))), config=Map("not.a.property" -> "error")))),
@ -71,7 +71,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
validateErrorCreateTopicsRequests(topicsReq(Seq( validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq(existingTopic), topicReq(existingTopic),
topicReq("partial-partitions", numPartitions = -1), topicReq("partial-partitions", numPartitions = -1),
topicReq("partial-replication", replicationFactor=numBrokers + 1), topicReq("partial-replication", replicationFactor=brokerCount + 1),
topicReq("partial-assignment", assignment=Map(0 -> List(0, 1), 1 -> List(0))), topicReq("partial-assignment", assignment=Map(0 -> List(0, 1), 1 -> List(0))),
topicReq("partial-none"))), topicReq("partial-none"))),
Map( Map(

View File

@ -32,8 +32,8 @@ import scala.collection.JavaConverters._
class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest { class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest {
import CreateTopicsRequestWithPolicyTest._ import CreateTopicsRequestWithPolicyTest._
override def propertyOverrides(properties: Properties): Unit = { override def brokerPropertyOverrides(properties: Properties): Unit = {
super.propertyOverrides(properties) super.brokerPropertyOverrides(properties)
properties.put(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[Policy].getName) properties.put(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[Policy].getName)
} }
@ -94,7 +94,7 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
Some("Topic 'existing-topic' already exists.")))) Some("Topic 'existing-topic' already exists."))))
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication", validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
numPartitions = 10, replicationFactor = numBrokers + 1)), validateOnly = true), numPartitions = 10, replicationFactor = brokerCount + 1)), validateOnly = true),
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR, Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
Some("Replication factor: 4 larger than available brokers: 3.")))) Some("Replication factor: 4 larger than available brokers: 3."))))

View File

@ -29,7 +29,7 @@ import scala.concurrent.ExecutionException
class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest { class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
var adminClient: AdminClient = null var adminClient: AdminClient = null
override def numBrokers = 1 override def brokerCount = 1
@Before @Before
override def setUp(): Unit = { override def setUp(): Unit = {

View File

@ -38,7 +38,7 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
var adminClient: AdminClient = null var adminClient: AdminClient = null
override def numBrokers = 1 override def brokerCount = 1
@Before @Before
override def setUp(): Unit = { override def setUp(): Unit = {
@ -47,11 +47,11 @@ class DelegationTokenRequestsTest extends BaseRequestTest with SaslSetup {
} }
override def generateConfigs = { override def generateConfigs = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect,
enableControlledShutdown = false, enableControlledShutdown = false,
interBrokerSecurityProtocol = Some(securityProtocol), interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true) trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true)
props.foreach(propertyOverrides) props.foreach(brokerPropertyOverrides)
props.map(KafkaConfig.fromProps) props.map(KafkaConfig.fromProps)
} }

View File

@ -36,7 +36,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest extends BaseRequestTest
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
var adminClient: AdminClient = null var adminClient: AdminClient = null
override def numBrokers = 1 override def brokerCount = 1
@Before @Before
override def setUp(): Unit = { override def setUp(): Unit = {

View File

@ -29,14 +29,14 @@ import java.util.Collections
class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest { class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest {
override def numBrokers: Int = 1 override def brokerCount: Int = 1
override def generateConfigs = { override def generateConfigs = {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect,
enableControlledShutdown = false, enableDeleteTopic = false, enableControlledShutdown = false, enableDeleteTopic = false,
interBrokerSecurityProtocol = Some(securityProtocol), interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
props.foreach(propertyOverrides) props.foreach(brokerPropertyOverrides)
props.map(KafkaConfig.fromProps) props.map(KafkaConfig.fromProps)
} }

View File

@ -27,7 +27,7 @@ import java.io.File
class DescribeLogDirsRequestTest extends BaseRequestTest { class DescribeLogDirsRequestTest extends BaseRequestTest {
override val logDirCount = 2 override val logDirCount = 2
override val numBrokers: Int = 1 override val brokerCount: Int = 1
val topic = "topic" val topic = "topic"
val partitionNum = 2 val partitionNum = 2

View File

@ -32,7 +32,7 @@ import org.junit.Test
class FetchRequestDownConversionConfigTest extends BaseRequestTest { class FetchRequestDownConversionConfigTest extends BaseRequestTest {
private var producer: KafkaProducer[String, String] = null private var producer: KafkaProducer[String, String] = null
override def numBrokers: Int = 1 override def brokerCount: Int = 1
override def setUp(): Unit = { override def setUp(): Unit = {
super.setUp() super.setUp()
@ -45,8 +45,8 @@ class FetchRequestDownConversionConfigTest extends BaseRequestTest {
super.tearDown() super.tearDown()
} }
override protected def propertyOverrides(properties: Properties): Unit = { override protected def brokerPropertyOverrides(properties: Properties): Unit = {
super.propertyOverrides(properties) super.brokerPropertyOverrides(properties)
properties.put(KafkaConfig.LogMessageDownConversionEnableProp, "false") properties.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
} }

View File

@ -36,9 +36,9 @@ import java.util.concurrent.atomic.AtomicInteger
*/ */
class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest { class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {
override def numBrokers: Int = 1 override def brokerCount: Int = 1
override def propertyOverrides(properties: Properties): Unit = { override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.MetricReporterClassesProp, classOf[KafkaMetricReporterExceptionHandlingTest.BadReporter].getName + "," + classOf[KafkaMetricReporterExceptionHandlingTest.GoodReporter].getName) properties.put(KafkaConfig.MetricReporterClassesProp, classOf[KafkaMetricReporterExceptionHandlingTest.BadReporter].getName + "," + classOf[KafkaMetricReporterExceptionHandlingTest.GoodReporter].getName)
} }

View File

@ -41,7 +41,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
val producerCount: Int = 1 val producerCount: Int = 1
val consumerCount: Int = 1 val consumerCount: Int = 1
val serverCount: Int = 2 val brokerCount: Int = 2
private val topic = "topic" private val topic = "topic"
private val partitionNum = 12 private val partitionNum = 12
override val logDirCount = 3 override val logDirCount = 3
@ -52,7 +52,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
@Before @Before
override def setUp() { override def setUp() {
super.setUp() super.setUp()
createTopic(topic, partitionNum, serverCount) createTopic(topic, partitionNum, brokerCount)
} }
@Test @Test
@ -71,7 +71,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
var server: KafkaServer = null var server: KafkaServer = null
try { try {
val props = TestUtils.createBrokerConfig(serverCount, zkConnect, logDirCount = 3) val props = TestUtils.createBrokerConfig(brokerCount, zkConnect, logDirCount = 3)
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.11.0") props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.11.0")
props.put(KafkaConfig.LogMessageFormatVersionProp, "0.11.0") props.put(KafkaConfig.LogMessageFormatVersionProp, "0.11.0")
val kafkaConfig = KafkaConfig.fromProps(props) val kafkaConfig = KafkaConfig.fromProps(props)
@ -118,7 +118,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
// has fetched from the leader and attempts to append to the offline replica. // has fetched from the leader and attempts to append to the offline replica.
producer.send(record).get producer.send(record).get
assertEquals(serverCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size) assertEquals(brokerCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size)
followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread => followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread =>
assertFalse("ReplicaFetcherThread should still be working if its partition count > 0", thread.isShutdownComplete) assertFalse("ReplicaFetcherThread should still be working if its partition count > 0", thread.isShutdownComplete)
} }

View File

@ -38,11 +38,11 @@ class LogOffsetTest extends BaseRequestTest {
private lazy val time = new MockTime private lazy val time = new MockTime
protected override def numBrokers = 1 override def brokerCount = 1
protected override def brokerTime(brokerId: Int) = time protected override def brokerTime(brokerId: Int) = time
protected override def propertyOverrides(props: Properties): Unit = { protected override def brokerPropertyOverrides(props: Properties): Unit = {
props.put("log.flush.interval.messages", "1") props.put("log.flush.interval.messages", "1")
props.put("num.partitions", "20") props.put("num.partitions", "20")
props.put("log.retention.hours", "10") props.put("log.retention.hours", "10")

View File

@ -34,7 +34,7 @@ import scala.collection.JavaConverters._
class MetadataRequestTest extends BaseRequestTest { class MetadataRequestTest extends BaseRequestTest {
override def propertyOverrides(properties: Properties) { override def brokerPropertyOverrides(properties: Properties) {
properties.setProperty(KafkaConfig.DefaultReplicationFactorProp, "2") properties.setProperty(KafkaConfig.DefaultReplicationFactorProp, "2")
properties.setProperty(KafkaConfig.RackProp, s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}") properties.setProperty(KafkaConfig.RackProp, s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}")
} }

View File

@ -49,7 +49,7 @@ import scala.collection.mutable.ListBuffer
class RequestQuotaTest extends BaseRequestTest { class RequestQuotaTest extends BaseRequestTest {
override def numBrokers: Int = 1 override def brokerCount: Int = 1
private val topic = "topic-1" private val topic = "topic-1"
private val numPartitions = 1 private val numPartitions = 1
@ -66,7 +66,7 @@ class RequestQuotaTest extends BaseRequestTest {
private val executor = Executors.newCachedThreadPool private val executor = Executors.newCachedThreadPool
private val tasks = new ListBuffer[Task] private val tasks = new ListBuffer[Task]
override def propertyOverrides(properties: Properties): Unit = { override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")

View File

@ -36,7 +36,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslSetup {
private val kafkaServerSaslMechanisms = List("PLAIN") private val kafkaServerSaslMechanisms = List("PLAIN")
protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
override def numBrokers = 1 override def brokerCount = 1
@Before @Before
override def setUp(): Unit = { override def setUp(): Unit = {

View File

@ -28,7 +28,7 @@ import collection.JavaConverters._
class StopReplicaRequestTest extends BaseRequestTest { class StopReplicaRequestTest extends BaseRequestTest {
override val logDirCount = 2 override val logDirCount = 2
override val numBrokers: Int = 1 override val brokerCount: Int = 1
val topic = "topic" val topic = "topic"
val partitionNum = 2 val partitionNum = 2