KAFKA-18298 Fix flaky testConsumerGroupsDeprecatedConsumerGroupState and testConsumerGroups in PlaintextAdminIntegrationTest (#18513)

It's related to KAFKA-18298 and KAFKA-18297. The root cause of the flaky tests is member rejoin after member removal. To prevent members from rejoining after being removed, before removing group members, calling `consumers.close` in ConsumerThread . This fix also extract the flaky member removal test  to new test `testConsumerGroupWithMemberRemoval`.

Flow of member removal test: 
1. Set 2 static consumer + 1 dynamic consumer
2. Close all consumers.
3. remove one static member
4. remove remaining members
 
Before KIP-1092, the member count is different between ClassicConsumer/AsyncConsumer. (AsyncConsumer will remove dynamic member after consumer closed.)

To get more details, please refer to the discussion under KAFKA-18297 and this PR:
- discussion : [Link](https://issues.apache.org/jira/browse/KAFKA-18297?focusedCommentId=17912537&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17912537)
- review: https://github.com/apache/kafka/pull/18513#pullrequestreview-2589110367

This PR fixed below flaky errors:

1. **PlaintextAdminIntegrationTest#testConsumerGroups**
  a.  `org.opentest4j.AssertionFailedError: expected: <2> but was: <3>` ([Report](https://ge.apache.org/s/lt3lpviv45cns/tests/task/:core:test/details/kafka.api.PlaintextAdminIntegrationTest/testConsumerGroups(String%2C%20String)%5B1%5D?top-execution=1))
  b.  `org.opentest4j.AssertionFailedError: expected: <true> but was: <false>` ([Report](https://ge.apache.org/s/jlxo446xalpoa/tests/task/:core:test/details/kafka.api.PlaintextAdminIntegrationTest/testConsumerGroups(String%2C%20String)%5B1%5D?top-execution=1))

2. **PlaintextAdminIntegrationTest#testConsumerGroupsDeprecatedConsumerGroupState**
  a.  `org.opentest4j.AssertionFailedError: expected: <2> but was: <3>` ([Report](https://ge.apache.org/s/ndoj6s2stb446/tests/task/:core:test/details/kafka.api.PlaintextAdminIntegrationTest/testConsumerGroupsDeprecatedConsumerGroupState(String%2C%20String)%5B1%5D?top-execution=1))
  b. `org.opentest4j.AssertionFailedError: expected: <true> but was: <false>` ([Report](https://ge.apache.org/s/kh3jze2tc5qeu/tests/task/:core:test/details/kafka.api.PlaintextAdminIntegrationTest/testConsumerGroupsDeprecatedConsumerGroupState(String%2C%20String)%5B1%5D?top-execution=1))

Reviewers: David Jacot <djacot@confluent.io>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
陳昱霖(Yu-Lin Chen) 2025-02-14 07:28:45 +08:00 committed by GitHub
parent e6b835f0b4
commit 2bbd25841e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 495 additions and 473 deletions

View File

@ -25,7 +25,7 @@ import java.time.{Duration => JDuration}
import java.util.Arrays.asList import java.util.Arrays.asList
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.{Collections, Optional, Properties} import java.util.{Collections, Locale, Optional, Properties}
import java.{time, util} import java.{time, util}
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
@ -1838,62 +1838,38 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
Utils.closeQuietly(producer, "producer") Utils.closeQuietly(producer, "producer")
} }
val EMPTY_GROUP_INSTANCE_ID = ""
val testGroupId = "test_group_id" val testGroupId = "test_group_id"
val testClientId = "test_client_id" val testClientId = "test_client_id"
val testInstanceId1 = "test_instance_id_1" val testInstanceId1 = "test_instance_id_1"
val testInstanceId2 = "test_instance_id_2" val testInstanceId2 = "test_instance_id_2"
val fakeGroupId = "fake_group_id" val fakeGroupId = "fake_group_id"
def createProperties(groupInstanceId: String): Properties = {
val newConsumerConfig = new Properties(consumerConfig)
// We need to disable the auto commit because after the members got removed from group, the offset commit
// will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler)
newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) {
newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)
}
newConsumerConfig
}
// contains two static members and one dynamic member // contains two static members and one dynamic member
val groupInstanceSet = Set(testInstanceId1, testInstanceId2, EMPTY_GROUP_INSTANCE_ID) val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
val consumerSet = groupInstanceSet.map { groupInstanceId => createConsumer(configOverrides = createProperties(groupInstanceId))}
val topicSet = Set(testTopicName, testTopicName1, testTopicName2) val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
val latch = new CountDownLatch(consumerSet.size) // We need to disable the auto commit because after the members got removed from group, the offset commit
try { // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler)
def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String): Thread = { val defaultConsumerConfig = new Properties(consumerConfig)
new Thread { defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
override def run : Unit = { defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
consumer.subscribe(Collections.singleton(topic)) defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
try {
while (true) { val backgroundConsumerSet = new BackgroundConsumerSet(defaultConsumerConfig)
consumer.poll(JDuration.ofSeconds(5)) groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
if (!consumer.assignment.isEmpty && latch.getCount > 0L) val configOverrides = new Properties()
latch.countDown() if (groupInstanceId != "") {
try { // static member
consumer.commitSync() configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)
} catch {
case _: CommitFailedException => // Ignore and retry on next iteration.
}
}
} catch {
case _: InterruptException => // Suppress the output to stderr
}
}
} }
backgroundConsumerSet.addConsumer(topic, configOverrides)
} }
// Start consumers in a thread that will subscribe to a new group. try {
val consumerThreads = consumerSet.zip(topicSet).map(zipped => createConsumerThread(zipped._1, zipped._2))
val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC
// Start consumer polling threads in the background
backgroundConsumerSet.start()
try {
consumerThreads.foreach(_.start())
assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
// Test that we can list the new group. // Test that we can list the new group.
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val matching = client.listConsumerGroups.all.get.asScala.filter(group => val matching = client.listConsumerGroups.all.get.asScala.filter(group =>
@ -1940,7 +1916,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// Test that we can get information about the test consumer group. // Test that we can get information about the test consumer group.
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId)) assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get() val testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
if (groupType == GroupType.CLASSIC) { if (groupType == GroupType.CLASSIC) {
assertTrue(testGroupDescription.groupEpoch.isEmpty) assertTrue(testGroupDescription.groupEpoch.isEmpty)
assertTrue(testGroupDescription.targetAssignmentEpoch.isEmpty) assertTrue(testGroupDescription.targetAssignmentEpoch.isEmpty)
@ -2001,87 +1977,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
parts = client.listConsumerGroupOffsets(groupSpecs, options).partitionsToOffsetAndMetadata().get() parts = client.listConsumerGroupOffsets(groupSpecs, options).partitionsToOffsetAndMetadata().get()
assertTrue(parts.containsKey(testTopicPart0)) assertTrue(parts.containsKey(testTopicPart0))
assertEquals(1, parts.get(testTopicPart0).offset()) assertEquals(1, parts.get(testTopicPart0).offset())
// Test delete non-exist consumer instance
val invalidInstanceId = "invalid-instance-id"
var removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions(
Collections.singleton(new MemberToRemove(invalidInstanceId))
))
assertFutureThrows(removeMembersResult.all, classOf[UnknownMemberIdException])
val firstMemberFuture = removeMembersResult.memberResult(new MemberToRemove(invalidInstanceId))
assertFutureThrows(firstMemberFuture, classOf[UnknownMemberIdException])
// Test consumer group deletion
var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
assertEquals(2, deleteResult.deletedGroups().size())
// Deleting the fake group ID should get GroupIdNotFoundException.
assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
assertFutureThrows(deleteResult.deletedGroups().get(fakeGroupId),
classOf[GroupIdNotFoundException])
// Deleting the real group ID should get GroupNotEmptyException
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
classOf[GroupNotEmptyException])
// Test delete one correct static member
val removeOptions = new RemoveMembersFromConsumerGroupOptions(Collections.singleton(new MemberToRemove(testInstanceId1)))
removeOptions.reason("test remove")
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
assertNull(removeMembersResult.all().get())
val validMemberFuture = removeMembersResult.memberResult(new MemberToRemove(testInstanceId1))
assertNull(validMemberFuture.get())
val describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(1, describeTestGroupResult.describedGroups().size())
testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, testGroupDescription.groupId)
assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertEquals(consumerSet.size - 1, testGroupDescription.members().size())
// Delete all active members remaining (a static member + a dynamic member)
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions())
assertNull(removeMembersResult.all().get())
// The group should contain no members now.
testGroupDescription = client.describeConsumerGroups(Seq(testGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
.describedGroups().get(testGroupId).get()
assertTrue(testGroupDescription.members().isEmpty)
// Consumer group deletion on empty group should succeed
deleteResult = client.deleteConsumerGroups(Seq(testGroupId).asJava)
assertEquals(1, deleteResult.deletedGroups().size())
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
assertNull(deleteResult.deletedGroups().get(testGroupId).get())
// Test alterConsumerGroupOffsets
val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(testGroupId,
Collections.singletonMap(testTopicPart0, new OffsetAndMetadata(0L)))
assertNull(alterConsumerGroupOffsetsResult.all().get())
assertNull(alterConsumerGroupOffsetsResult.partitionResult(testTopicPart0).get())
// Verify alterConsumerGroupOffsets success
TestUtils.waitUntilTrue(() => {
val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 0)
}, s"Expected the offset for partition 0 to eventually become 0.")
} finally { } finally {
consumerThreads.foreach { backgroundConsumerSet.close()
case consumerThread =>
consumerThread.interrupt()
consumerThread.join()
}
}
} finally {
consumerSet.zip(groupInstanceSet).foreach(zipped => Utils.closeQuietly(zipped._1, zipped._2))
} }
} finally { } finally {
Utils.closeQuietly(client, "adminClient") Utils.closeQuietly(client, "adminClient")
@ -2204,62 +2101,38 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
Utils.closeQuietly(producer, "producer") Utils.closeQuietly(producer, "producer")
} }
val EMPTY_GROUP_INSTANCE_ID = ""
val testGroupId = "test_group_id" val testGroupId = "test_group_id"
val testClientId = "test_client_id" val testClientId = "test_client_id"
val testInstanceId1 = "test_instance_id_1" val testInstanceId1 = "test_instance_id_1"
val testInstanceId2 = "test_instance_id_2" val testInstanceId2 = "test_instance_id_2"
val fakeGroupId = "fake_group_id" val fakeGroupId = "fake_group_id"
def createProperties(groupInstanceId: String): Properties = {
val newConsumerConfig = new Properties(consumerConfig)
// We need to disable the auto commit because after the members got removed from group, the offset commit
// will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler)
newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) {
newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)
}
newConsumerConfig
}
// contains two static members and one dynamic member // contains two static members and one dynamic member
val groupInstanceSet = Set(testInstanceId1, testInstanceId2, EMPTY_GROUP_INSTANCE_ID) val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
val consumerSet = groupInstanceSet.map { groupInstanceId => createConsumer(configOverrides = createProperties(groupInstanceId))}
val topicSet = Set(testTopicName, testTopicName1, testTopicName2) val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
val latch = new CountDownLatch(consumerSet.size) // We need to disable the auto commit because after the members got removed from group, the offset commit
try { // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler)
def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String): Thread = { val defaultConsumerConfig = new Properties(consumerConfig)
new Thread { defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
override def run : Unit = { defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
consumer.subscribe(Collections.singleton(topic)) defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
try {
while (true) { val backgroundConsumerSet = new BackgroundConsumerSet(defaultConsumerConfig)
consumer.poll(JDuration.ofSeconds(5)) groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
if (!consumer.assignment.isEmpty && latch.getCount > 0L) val configOverrides = new Properties()
latch.countDown() if (groupInstanceId != "") {
try { // static member
consumer.commitSync() configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)
} catch {
case _: CommitFailedException => // Ignore and retry on next iteration.
}
}
} catch {
case _: InterruptException => // Suppress the output to stderr
}
}
} }
backgroundConsumerSet.addConsumer(topic, configOverrides)
} }
// Start consumers in a thread that will subscribe to a new group. try {
val consumerThreads = consumerSet.zip(topicSet).map(zipped => createConsumerThread(zipped._1, zipped._2))
val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC
// Start consumer polling threads in the background
backgroundConsumerSet.start()
try {
consumerThreads.foreach(_.start())
assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
// Test that we can list the new group. // Test that we can list the new group.
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val matching = client.listConsumerGroups.all.get.asScala.filter(group => val matching = client.listConsumerGroups.all.get.asScala.filter(group =>
@ -2336,7 +2209,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// Test that we can get information about the test consumer group. // Test that we can get information about the test consumer group.
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId)) assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get() val testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, testGroupDescription.groupId()) assertEquals(testGroupId, testGroupDescription.groupId())
assertFalse(testGroupDescription.isSimpleConsumerGroup) assertFalse(testGroupDescription.isSimpleConsumerGroup)
@ -2387,13 +2260,87 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
parts = client.listConsumerGroupOffsets(groupSpecs, options).partitionsToOffsetAndMetadata().get() parts = client.listConsumerGroupOffsets(groupSpecs, options).partitionsToOffsetAndMetadata().get()
assertTrue(parts.containsKey(testTopicPart0)) assertTrue(parts.containsKey(testTopicPart0))
assertEquals(1, parts.get(testTopicPart0).offset()) assertEquals(1, parts.get(testTopicPart0).offset())
} finally {
backgroundConsumerSet.close()
}
} finally {
Utils.closeQuietly(client, "adminClient")
}
}
/**
* Test the consumer group APIs for member removal.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumerGroupWithMemberRemoval(quorum: String, groupProtocol: String): Unit = {
val config = createConfig
client = Admin.create(config)
try {
// Verify that initially there are no consumer groups to list.
val list1 = client.listConsumerGroups()
assertEquals(0, list1.all().get().size())
assertEquals(0, list1.errors().get().size())
assertEquals(0, list1.valid().get().size())
val testTopicName = "test_topic"
val testTopicName1 = testTopicName + "1"
val testTopicName2 = testTopicName + "2"
val testNumPartitions = 2
client.createTopics(util.Arrays.asList(
new NewTopic(testTopicName, testNumPartitions, 1.toShort),
new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
)).all().get()
waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List())
val producer = createProducer()
try {
producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
} finally {
Utils.closeQuietly(producer, "producer")
}
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
val testInstanceId1 = "test_instance_id_1"
val testInstanceId2 = "test_instance_id_2"
val fakeGroupId = "fake_group_id"
// contains two static members and one dynamic member
val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
// We need to disable the auto commit because after the members got removed from group, the offset commit
// will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler)
val defaultConsumerConfig = new Properties(consumerConfig)
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
// We need to set internal.leave.group.on.close to validate dynamic member removal, but it only works for ClassicConsumer
// After KIP-1092, we can control dynamic member removal for both ClassicConsumer and AsyncConsumer
defaultConsumerConfig.setProperty("internal.leave.group.on.close", "false")
val backgroundConsumerSet = new BackgroundConsumerSet(defaultConsumerConfig)
groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
val configOverrides = new Properties()
if (groupInstanceId != "") {
// static member
configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)
}
backgroundConsumerSet.addConsumer(topic, configOverrides)
}
try {
// Start consumer polling threads in the background
backgroundConsumerSet.start()
// Test delete non-exist consumer instance // Test delete non-exist consumer instance
val invalidInstanceId = "invalid-instance-id" val invalidInstanceId = "invalid-instance-id"
var removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions( var removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions(
Collections.singleton(new MemberToRemove(invalidInstanceId)) Collections.singleton(new MemberToRemove(invalidInstanceId))
)) ))
assertFutureThrows(removeMembersResult.all, classOf[UnknownMemberIdException]) assertFutureThrows(removeMembersResult.all, classOf[UnknownMemberIdException])
val firstMemberFuture = removeMembersResult.memberResult(new MemberToRemove(invalidInstanceId)) val firstMemberFuture = removeMembersResult.memberResult(new MemberToRemove(invalidInstanceId))
assertFutureThrows(firstMemberFuture, classOf[UnknownMemberIdException]) assertFutureThrows(firstMemberFuture, classOf[UnknownMemberIdException])
@ -2412,33 +2359,50 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertFutureThrows(deleteResult.deletedGroups().get(testGroupId), assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
classOf[GroupNotEmptyException]) classOf[GroupNotEmptyException])
// Test delete one correct static member // Stop the consumer threads and close consumers to prevent member rejoining.
val removeOptions = new RemoveMembersFromConsumerGroupOptions(Collections.singleton(new MemberToRemove(testInstanceId1))) backgroundConsumerSet.stop()
removeOptions.reason("test remove")
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
assertNull(removeMembersResult.all().get()) // Check the members in the group after consumers have stopped
val validMemberFuture = removeMembersResult.memberResult(new MemberToRemove(testInstanceId1)) var describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava,
assertNull(validMemberFuture.get())
val describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(1, describeTestGroupResult.describedGroups().size()) assertEquals(1, describeTestGroupResult.describedGroups().size())
testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get() var testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, testGroupDescription.groupId) assertEquals(testGroupId, testGroupDescription.groupId)
assertFalse(testGroupDescription.isSimpleConsumerGroup) assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertEquals(consumerSet.size - 1, testGroupDescription.members().size())
// Delete all active members remaining (a static member + a dynamic member) // Although we set `internal.leave.group.on.close` in the consumer, it only works for ClassicConsumer.
// After KIP-1092, we can control dynamic member removal in consumer.close()
if (groupProtocol == GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)) {
assertEquals(3, testGroupDescription.members().size())
} else if (groupProtocol == GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) {
assertEquals(2, testGroupDescription.members().size())
}
// Test delete one static member
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId,
new RemoveMembersFromConsumerGroupOptions(Collections.singleton(new MemberToRemove(testInstanceId1))))
assertNull(removeMembersResult.all().get())
assertNull(removeMembersResult.memberResult(new MemberToRemove(testInstanceId1)).get())
describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get()
if (groupProtocol == GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)) {
assertEquals(2, testGroupDescription.members().size())
} else if (groupProtocol == GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) {
assertEquals(1, testGroupDescription.members().size())
}
// Delete all active members remaining
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions()) removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions())
assertNull(removeMembersResult.all().get()) assertNull(removeMembersResult.all().get())
// The group should contain no members now. // The group should contain no members now.
testGroupDescription = client.describeConsumerGroups(Seq(testGroupId).asJava, testGroupDescription = client.describeConsumerGroups(Seq(testGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)).describedGroups().get(testGroupId).get()
.describedGroups().get(testGroupId).get()
assertTrue(testGroupDescription.members().isEmpty) assertTrue(testGroupDescription.members().isEmpty)
// Consumer group deletion on empty group should succeed // Consumer group deletion on empty group should succeed
@ -2448,7 +2412,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
assertNull(deleteResult.deletedGroups().get(testGroupId).get()) assertNull(deleteResult.deletedGroups().get(testGroupId).get())
// Test alterConsumerGroupOffsets // Test alterConsumerGroupOffsets when group is empty
val testTopicPart0 = new TopicPartition(testTopicName, 0)
val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(testGroupId, val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(testGroupId,
Collections.singletonMap(testTopicPart0, new OffsetAndMetadata(0L))) Collections.singletonMap(testTopicPart0, new OffsetAndMetadata(0L)))
assertNull(alterConsumerGroupOffsetsResult.all().get()) assertNull(alterConsumerGroupOffsetsResult.all().get())
@ -2460,14 +2425,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 0) parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 0)
}, s"Expected the offset for partition 0 to eventually become 0.") }, s"Expected the offset for partition 0 to eventually become 0.")
} finally { } finally {
consumerThreads.foreach { backgroundConsumerSet.close()
case consumerThread =>
consumerThread.interrupt()
consumerThread.join()
}
}
} finally {
consumerSet.zip(groupInstanceSet).foreach(zipped => Utils.closeQuietly(zipped._1, zipped._2))
} }
} finally { } finally {
Utils.closeQuietly(client, "adminClient") Utils.closeQuietly(client, "adminClient")
@ -4003,6 +3961,70 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, Collections.emptyList(), null, null), ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, Collections.emptyList(), null, null),
topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG)) topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
} }
class BackgroundConsumerSet(defaultConsumerConfig: Properties) {
private val consumerSet: scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = scala.collection.mutable.Set.empty
private val consumerThreads: scala.collection.mutable.Set[Thread] = scala.collection.mutable.Set.empty
private var startLatch: CountDownLatch = new CountDownLatch(0)
private var stopLatch: CountDownLatch = new CountDownLatch(0)
private var consumerThreadRunning = new AtomicBoolean(false)
def addConsumer(topic: String, configOverrides: Properties = new Properties()): Unit = {
val newConsumerConfig = defaultConsumerConfig.clone().asInstanceOf[Properties]
newConsumerConfig.putAll(configOverrides)
val consumer = createConsumer(configOverrides = newConsumerConfig)
val consumerThread = createConsumerThread(consumer, topic)
consumerSet.add(consumer)
consumerThreads.add(consumerThread)
}
def start(): Unit = {
startLatch = new CountDownLatch(consumerSet.size)
stopLatch = new CountDownLatch(consumerSet.size)
consumerThreadRunning = new AtomicBoolean(true)
consumerThreads.foreach(_.start())
assertTrue(startLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to start consumer threads in time")
}
def stop(): Unit = {
consumerSet.foreach(_.wakeup())
consumerThreadRunning.set(false)
assertTrue(stopLatch.await(30000, TimeUnit.MILLISECONDS), "Failed to stop consumer threads in time")
}
def close(): Unit = {
// stop the consumers and wait for consumer threads stopped
stop()
consumerThreads.foreach(_.join())
}
private def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String): Thread = {
new Thread {
override def run : Unit = {
consumer.subscribe(Collections.singleton(topic))
try {
while (consumerThreadRunning.get()) {
consumer.poll(JDuration.ofSeconds(5))
if (!consumer.assignment.isEmpty && startLatch.getCount > 0L)
startLatch.countDown()
try {
consumer.commitSync()
} catch {
case _: CommitFailedException => // Ignore and retry on next iteration.
}
}
} catch {
case _: WakeupException => // ignore
case _: InterruptException => // ignore
} finally {
consumer.close()
stopLatch.countDown()
}
}
}
}
}
} }
object PlaintextAdminIntegrationTest { object PlaintextAdminIntegrationTest {