KAFKA-18424: Consider splitting PlaintextAdminIntegrationTest#testConsumerGroups (#19093)
CI / build (push) Waiting to run Details

JIRA: KAFKA-18424
`PlaintextAdminIntegrationTest#testConsumerGroups` test too many things.
We should split it into smaller units.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2025-05-26 12:10:49 +08:00 committed by GitHub
parent 2fe447a8a3
commit 48a52701b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 179 additions and 124 deletions

View File

@ -1818,48 +1818,22 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(1, factory.failuresInjected) assertEquals(1, factory.failuresInjected)
} }
/**
* Test the consumer group APIs.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll")) @MethodSource(Array("getTestGroupProtocolParametersAll"))
def testConsumerGroups(groupProtocol: String): Unit = { def testListConsumerGroupOffsets(groupProtocol: String): Unit = {
val config = createConfig val config = createConfig
client = Admin.create(config) client = Admin.create(config)
try { try {
// Verify that initially there are no consumer groups to list. assertConsumerGroupsIsClean()
val list1 = client.listConsumerGroups()
assertEquals(0, list1.all().get().size())
assertEquals(0, list1.errors().get().size())
assertEquals(0, list1.valid().get().size())
val testTopicName = "test_topic" val testTopicName = "test_topic"
val testTopicName1 = testTopicName + "1" prepareTopics(List(testTopicName), 2)
val testTopicName2 = testTopicName + "2" prepareRecords(testTopicName)
val testNumPartitions = 2
client.createTopics(util.List.of(
new NewTopic(testTopicName, testNumPartitions, 1.toShort),
new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
)).all().get()
waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List())
val producer = createProducer()
try {
producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
} finally {
Utils.closeQuietly(producer, "producer")
}
val testGroupId = "test_group_id" val testGroupId = "test_group_id"
val testClientId = "test_client_id" val testClientId = "test_client_id"
val testInstanceId1 = "test_instance_id_1" val groupInstances = Set("")
val testInstanceId2 = "test_instance_id_2" val topics = Set(testTopicName)
val fakeGroupId = "fake_group_id"
// contains two static members and one dynamic member
val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
// We need to disable the auto commit because after the members got removed from group, the offset commit // We need to disable the auto commit because after the members got removed from group, the offset commit
// will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler) // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler)
@ -1867,27 +1841,85 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
val backgroundConsumers = prepareConsumers(groupInstances, topics, defaultConsumerConfig)
val backgroundConsumerSet = new BackgroundConsumerSet(defaultConsumerConfig) try {
groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) => // Start consumer polling threads in the background
val configOverrides = new Properties() backgroundConsumers.start()
if (groupInstanceId != "") { val topicPartition = new TopicPartition(testTopicName, 0)
// static member
configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId) // Test listConsumerGroupOffsets
} TestUtils.waitUntilTrue(() => {
backgroundConsumerSet.addConsumer(topic, configOverrides) val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
parts.containsKey(topicPartition) && (parts.get(topicPartition).offset() == 1)
}, "Expected the offset for partition 0 to eventually become 1.")
// Test listConsumerGroupOffsets with requireStable true
val options = new ListConsumerGroupOffsetsOptions().requireStable(true)
var parts = client.listConsumerGroupOffsets(testGroupId, options)
.partitionsToOffsetAndMetadata()
.get()
assertTrue(parts.containsKey(topicPartition))
assertEquals(1, parts.get(topicPartition).offset())
// Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
val groupSpecs = util.Map.of(
testGroupId,
new ListConsumerGroupOffsetsSpec().topicPartitions(util.List.of(new TopicPartition(testTopicName, 0)))
)
parts = client.listConsumerGroupOffsets(groupSpecs)
.partitionsToOffsetAndMetadata()
.get()
assertTrue(parts.containsKey(topicPartition))
assertEquals(1, parts.get(topicPartition).offset())
// Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and requireStable option
parts = client.listConsumerGroupOffsets(groupSpecs, options)
.partitionsToOffsetAndMetadata()
.get()
assertTrue(parts.containsKey(topicPartition))
assertEquals(1, parts.get(topicPartition).offset())
} finally {
backgroundConsumers.close()
} }
} finally {
Utils.closeQuietly(client, "adminClient")
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testListConsumerGroups(groupProtocol: String): Unit = {
val config = createConfig
client = Admin.create(config)
try {
assertConsumerGroupsIsClean()
val testTopicName = "test_topic"
prepareTopics(List(testTopicName), 2)
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
val groupInstances = Set("")
val topics = Set(testTopicName)
// We need to disable the auto commit because after the members got removed from group, the offset commit
// will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler)
val defaultConsumerConfig = new Properties(consumerConfig)
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
val backgroundConsumers = prepareConsumers(groupInstances, topics, defaultConsumerConfig)
try { try {
val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC
// Start consumer polling threads in the background // Start consumer polling threads in the background
backgroundConsumerSet.start() backgroundConsumers.start()
// Test that we can list the new group. // Test that we can list the new group.
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val matching = client.listConsumerGroups.all.get.asScala.filter(group => val matching = client.listConsumerGroups.all.get.asScala.filter(group =>
group.groupId == testGroupId && group.groupId == testGroupId && group.groupState.get == GroupState.STABLE)
group.groupState.get == GroupState.STABLE)
matching.size == 1 matching.size == 1
}, s"Expected to be able to list $testGroupId") }, s"Expected to be able to list $testGroupId")
@ -1903,25 +1935,73 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val options = new ListConsumerGroupsOptions().withTypes(util.Set.of(groupType)) val options = new ListConsumerGroupsOptions().withTypes(util.Set.of(groupType))
.inGroupStates(util.Set.of(GroupState.STABLE)) .inGroupStates(util.Set.of(GroupState.STABLE))
val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => val matching = client.listConsumerGroups(options).all.get.asScala.filter(group =>
group.groupId == testGroupId && group.groupId == testGroupId && group.groupState.get == GroupState.STABLE)
group.groupState.get == GroupState.STABLE)
matching.size == 1 matching.size == 1
}, s"Expected to be able to list $testGroupId in group type $groupType and state Stable") }, s"Expected to be able to list $testGroupId in group type $groupType and state Stable")
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val options = new ListConsumerGroupsOptions().inGroupStates(util.Set.of(GroupState.STABLE)) val options = new ListConsumerGroupsOptions().inGroupStates(util.Set.of(GroupState.STABLE))
val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => val matching = client.listConsumerGroups(options).all.get.asScala.filter(group =>
group.groupId == testGroupId && group.groupId == testGroupId && group.groupState.get == GroupState.STABLE)
group.groupState.get == GroupState.STABLE)
matching.size == 1 matching.size == 1
}, s"Expected to be able to list $testGroupId in state Stable") }, s"Expected to be able to list $testGroupId in state Stable")
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val options = new ListConsumerGroupsOptions().inGroupStates(util.Set.of(GroupState.EMPTY)) val options = new ListConsumerGroupsOptions().inGroupStates(util.Set.of(GroupState.EMPTY))
val matching = client.listConsumerGroups(options).all.get.asScala.filter( val matching = client.listConsumerGroups(options).all.get.asScala.filter(_.groupId == testGroupId)
_.groupId == testGroupId)
matching.isEmpty matching.isEmpty
}, s"Expected to find zero groups") }, "Expected to find zero groups")
} finally {
backgroundConsumers.close()
}
} finally {
Utils.closeQuietly(client, "adminClient")
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testDescribeGroups(groupProtocol: String): Unit = {
val config = createConfig
client = Admin.create(config)
try {
assertConsumerGroupsIsClean()
val testTopicName = "test_topic"
val testTopicName1 = testTopicName + "1"
val testTopicName2 = testTopicName + "2"
val testNumPartitions = 2
prepareTopics(List(testTopicName, testTopicName1, testTopicName2), testNumPartitions)
val producer = createProducer()
try {
producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
} finally {
Utils.closeQuietly(producer, "producer")
}
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
val testInstanceId1 = "test_instance_id_1"
val testInstanceId2 = "test_instance_id_2"
val fakeGroupId = "fake_group_id"
// contains two static members and one dynamic member
val groupInstances = Set(testInstanceId1, testInstanceId2, "")
val topics = Set(testTopicName, testTopicName1, testTopicName2)
// We need to disable the auto commit because after the members got removed from group, the offset commit
// will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler)
val defaultConsumerConfig = new Properties(consumerConfig)
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
val backgroundConsumers = prepareConsumers(groupInstances, topics, defaultConsumerConfig)
try {
val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC
// Start consumer polling threads in the background
backgroundConsumers.start()
val describeWithFakeGroupResult = client.describeConsumerGroups(util.List.of(testGroupId, fakeGroupId), val describeWithFakeGroupResult = client.describeConsumerGroups(util.List.of(testGroupId, fakeGroupId),
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
@ -1940,17 +2020,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(testGroupId, testGroupDescription.groupId()) assertEquals(testGroupId, testGroupDescription.groupId())
assertFalse(testGroupDescription.isSimpleConsumerGroup) assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertEquals(groupInstanceSet.size, testGroupDescription.members().size()) assertEquals(groupInstances.size, testGroupDescription.members().size())
val members = testGroupDescription.members() val members = testGroupDescription.members()
members.asScala.foreach { member => members.asScala.foreach { member =>
assertEquals(testClientId, member.clientId) assertEquals(testClientId, member.clientId)
assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else Optional.of(true), member.upgraded) assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else Optional.of(true), member.upgraded)
} }
val topicPartitionsByTopic = members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic()) val topicPartitionsByTopic = members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
topicSet.foreach { topic => topics.foreach(topic => assertEquals(testNumPartitions, topicPartitionsByTopic.getOrElse(topic, List.empty).size))
val topicPartitions = topicPartitionsByTopic.getOrElse(topic, List.empty)
assertEquals(testNumPartitions, topicPartitions.size)
}
val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP) val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP)
assertEquals(expectedOperations, testGroupDescription.authorizedOperations()) assertEquals(expectedOperations, testGroupDescription.authorizedOperations())
@ -1963,35 +2040,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// Test that all() also throws GroupIdNotFoundException // Test that all() also throws GroupIdNotFoundException
assertFutureThrows(classOf[GroupIdNotFoundException], describeWithFakeGroupResult.all(), assertFutureThrows(classOf[GroupIdNotFoundException], describeWithFakeGroupResult.all(),
s"Group $fakeGroupId not found.") s"Group $fakeGroupId not found.")
val testTopicPart0 = new TopicPartition(testTopicName, 0)
// Test listConsumerGroupOffsets
TestUtils.waitUntilTrue(() => {
val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 1)
}, s"Expected the offset for partition 0 to eventually become 1.")
// Test listConsumerGroupOffsets with requireStable true
val options = new ListConsumerGroupOffsetsOptions().requireStable(true)
var parts = client.listConsumerGroupOffsets(testGroupId, options)
.partitionsToOffsetAndMetadata().get()
assertTrue(parts.containsKey(testTopicPart0))
assertEquals(1, parts.get(testTopicPart0).offset())
// Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
val groupSpecs = util.Map.of(testGroupId,
new ListConsumerGroupOffsetsSpec().topicPartitions(util.Set.of(new TopicPartition(testTopicName, 0))))
parts = client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get()
assertTrue(parts.containsKey(testTopicPart0))
assertEquals(1, parts.get(testTopicPart0).offset())
// Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and requireStable option
parts = client.listConsumerGroupOffsets(groupSpecs, options).partitionsToOffsetAndMetadata().get()
assertTrue(parts.containsKey(testTopicPart0))
assertEquals(1, parts.get(testTopicPart0).offset())
} finally { } finally {
backgroundConsumerSet.close() backgroundConsumers.close()
} }
} finally { } finally {
Utils.closeQuietly(client, "adminClient") Utils.closeQuietly(client, "adminClient")
@ -2089,29 +2139,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val config = createConfig val config = createConfig
client = Admin.create(config) client = Admin.create(config)
try { try {
// Verify that initially there are no consumer groups to list. assertConsumerGroupsIsClean()
val list1 = client.listConsumerGroups()
assertEquals(0, list1.all().get().size())
assertEquals(0, list1.errors().get().size())
assertEquals(0, list1.valid().get().size())
val testTopicName = "test_topic" val testTopicName = "test_topic"
val testTopicName1 = testTopicName + "1" val testTopicName1 = testTopicName + "1"
val testTopicName2 = testTopicName + "2" val testTopicName2 = testTopicName + "2"
val testNumPartitions = 2 val testNumPartitions = 2
client.createTopics(util.List.of( prepareTopics(List(testTopicName, testTopicName1, testTopicName2), testNumPartitions)
new NewTopic(testTopicName, testNumPartitions, 1.toShort), prepareRecords(testTopicName)
new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
)).all().get()
waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List())
val producer = createProducer()
try {
producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
} finally {
Utils.closeQuietly(producer, "producer")
}
val testGroupId = "test_group_id" val testGroupId = "test_group_id"
val testClientId = "test_client_id" val testClientId = "test_client_id"
@ -2291,28 +2327,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
client = Admin.create(config) client = Admin.create(config)
try { try {
// Verify that initially there are no consumer groups to list. // Verify that initially there are no consumer groups to list.
val list1 = client.listConsumerGroups() assertConsumerGroupsIsClean()
assertEquals(0, list1.all().get().size())
assertEquals(0, list1.errors().get().size())
assertEquals(0, list1.valid().get().size())
val testTopicName = "test_topic" val testTopicName = "test_topic"
val testTopicName1 = testTopicName + "1" val testTopicName1 = testTopicName + "1"
val testTopicName2 = testTopicName + "2" val testTopicName2 = testTopicName + "2"
val testNumPartitions = 2 val testNumPartitions = 2
client.createTopics(util.List.of( prepareTopics(List(testTopicName, testTopicName1, testTopicName2), testNumPartitions)
new NewTopic(testTopicName, testNumPartitions, 1.toShort),
new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
)).all().get()
waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List())
val producer = createProducer() prepareRecords(testTopicName)
try {
producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
} finally {
Utils.closeQuietly(producer, "producer")
}
val testGroupId = "test_group_id" val testGroupId = "test_group_id"
val testClientId = "test_client_id" val testClientId = "test_client_id"
@ -2461,12 +2484,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
new NewTopic(testTopicName, 1, 1.toShort))).all().get() new NewTopic(testTopicName, 1, 1.toShort))).all().get()
waitForTopics(client, List(testTopicName), List()) waitForTopics(client, List(testTopicName), List())
val producer = createProducer() prepareRecords(testTopicName)
try {
producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
} finally {
Utils.closeQuietly(producer, "producer")
}
val newConsumerConfig = new Properties(consumerConfig) val newConsumerConfig = new Properties(consumerConfig)
newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
@ -2511,6 +2529,43 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
} }
} }
private def prepareTopics(topics: List[String], numberOfPartitions: Int): Unit = {
client.createTopics(topics.map(topic => new NewTopic(topic, numberOfPartitions, 1.toShort)).asJava).all().get()
waitForTopics(client, topics, List())
}
private def prepareRecords(testTopicName: String) = {
val producer = createProducer()
try {
producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
} finally {
Utils.closeQuietly(producer, "producer")
}
}
private def prepareConsumers(groupInstanceSet: Set[String], topicSet: Set[String], defaultConsumerConfig: Properties) = {
val backgroundConsumerSet = new BackgroundConsumerSet(defaultConsumerConfig)
groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
val configOverrides = new Properties()
if (groupInstanceId != "") {
// static member
configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)
}
backgroundConsumerSet.addConsumer(topic, configOverrides)
}
backgroundConsumerSet
}
/**
* Verify that initially there are no consumer groups to list.
*/
private def assertConsumerGroupsIsClean(): Unit = {
val listResult = client.listConsumerGroups()
assertEquals(0, listResult.all().get().size())
assertEquals(0, listResult.errors().get().size())
assertEquals(0, listResult.valid().get().size())
}
@Test @Test
def testListGroups(): Unit = { def testListGroups(): Unit = {
val classicGroupId = "classic_group_id" val classicGroupId = "classic_group_id"