diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index dc24bd84c59..b99cdc73ce3 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1818,48 +1818,22 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(1, factory.failuresInjected) } - /** - * Test the consumer group APIs. - */ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumerGroups(groupProtocol: String): Unit = { + def testListConsumerGroupOffsets(groupProtocol: String): Unit = { val config = createConfig client = Admin.create(config) try { - // Verify that initially there are no consumer groups to list. - val list1 = client.listConsumerGroups() - assertEquals(0, list1.all().get().size()) - assertEquals(0, list1.errors().get().size()) - assertEquals(0, list1.valid().get().size()) + assertConsumerGroupsIsClean() + val testTopicName = "test_topic" - val testTopicName1 = testTopicName + "1" - val testTopicName2 = testTopicName + "2" - val testNumPartitions = 2 - - client.createTopics(util.List.of( - new NewTopic(testTopicName, testNumPartitions, 1.toShort), - new NewTopic(testTopicName1, testNumPartitions, 1.toShort), - new NewTopic(testTopicName2, testNumPartitions, 1.toShort) - )).all().get() - waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List()) - - val producer = createProducer() - try { - producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() - } finally { - Utils.closeQuietly(producer, "producer") - } + prepareTopics(List(testTopicName), 2) + prepareRecords(testTopicName) val testGroupId = "test_group_id" val testClientId = "test_client_id" - val testInstanceId1 = "test_instance_id_1" - val testInstanceId2 = "test_instance_id_2" - val fakeGroupId = "fake_group_id" - - // contains two static members and one dynamic member - val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "") - val topicSet = Set(testTopicName, testTopicName1, testTopicName2) + val groupInstances = Set("") + val topics = Set(testTopicName) // We need to disable the auto commit because after the members got removed from group, the offset commit // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler) @@ -1867,27 +1841,85 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + val backgroundConsumers = prepareConsumers(groupInstances, topics, defaultConsumerConfig) - val backgroundConsumerSet = new BackgroundConsumerSet(defaultConsumerConfig) - groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) => - val configOverrides = new Properties() - if (groupInstanceId != "") { - // static member - configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId) - } - backgroundConsumerSet.addConsumer(topic, configOverrides) + try { + // Start consumer polling threads in the background + backgroundConsumers.start() + val topicPartition = new TopicPartition(testTopicName, 0) + + // Test listConsumerGroupOffsets + TestUtils.waitUntilTrue(() => { + val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get() + parts.containsKey(topicPartition) && (parts.get(topicPartition).offset() == 1) + }, "Expected the offset for partition 0 to eventually become 1.") + + // Test listConsumerGroupOffsets with requireStable true + val options = new ListConsumerGroupOffsetsOptions().requireStable(true) + var parts = client.listConsumerGroupOffsets(testGroupId, options) + .partitionsToOffsetAndMetadata() + .get() + assertTrue(parts.containsKey(topicPartition)) + assertEquals(1, parts.get(topicPartition).offset()) + + // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec + val groupSpecs = util.Map.of( + testGroupId, + new ListConsumerGroupOffsetsSpec().topicPartitions(util.List.of(new TopicPartition(testTopicName, 0))) + ) + parts = client.listConsumerGroupOffsets(groupSpecs) + .partitionsToOffsetAndMetadata() + .get() + assertTrue(parts.containsKey(topicPartition)) + assertEquals(1, parts.get(topicPartition).offset()) + + // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and requireStable option + parts = client.listConsumerGroupOffsets(groupSpecs, options) + .partitionsToOffsetAndMetadata() + .get() + assertTrue(parts.containsKey(topicPartition)) + assertEquals(1, parts.get(topicPartition).offset()) + } finally { + backgroundConsumers.close() } + } finally { + Utils.closeQuietly(client, "adminClient") + } + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) + def testListConsumerGroups(groupProtocol: String): Unit = { + val config = createConfig + client = Admin.create(config) + try { + assertConsumerGroupsIsClean() + + val testTopicName = "test_topic" + prepareTopics(List(testTopicName), 2) + + val testGroupId = "test_group_id" + val testClientId = "test_client_id" + val groupInstances = Set("") + val topics = Set(testTopicName) + + // We need to disable the auto commit because after the members got removed from group, the offset commit + // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler) + val defaultConsumerConfig = new Properties(consumerConfig) + defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + val backgroundConsumers = prepareConsumers(groupInstances, topics, defaultConsumerConfig) try { val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC // Start consumer polling threads in the background - backgroundConsumerSet.start() + backgroundConsumers.start() // Test that we can list the new group. TestUtils.waitUntilTrue(() => { val matching = client.listConsumerGroups.all.get.asScala.filter(group => - group.groupId == testGroupId && - group.groupState.get == GroupState.STABLE) + group.groupId == testGroupId && group.groupState.get == GroupState.STABLE) matching.size == 1 }, s"Expected to be able to list $testGroupId") @@ -1903,25 +1935,73 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val options = new ListConsumerGroupsOptions().withTypes(util.Set.of(groupType)) .inGroupStates(util.Set.of(GroupState.STABLE)) val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => - group.groupId == testGroupId && - group.groupState.get == GroupState.STABLE) + group.groupId == testGroupId && group.groupState.get == GroupState.STABLE) matching.size == 1 }, s"Expected to be able to list $testGroupId in group type $groupType and state Stable") TestUtils.waitUntilTrue(() => { val options = new ListConsumerGroupsOptions().inGroupStates(util.Set.of(GroupState.STABLE)) val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => - group.groupId == testGroupId && - group.groupState.get == GroupState.STABLE) + group.groupId == testGroupId && group.groupState.get == GroupState.STABLE) matching.size == 1 }, s"Expected to be able to list $testGroupId in state Stable") TestUtils.waitUntilTrue(() => { val options = new ListConsumerGroupsOptions().inGroupStates(util.Set.of(GroupState.EMPTY)) - val matching = client.listConsumerGroups(options).all.get.asScala.filter( - _.groupId == testGroupId) + val matching = client.listConsumerGroups(options).all.get.asScala.filter(_.groupId == testGroupId) matching.isEmpty - }, s"Expected to find zero groups") + }, "Expected to find zero groups") + } finally { + backgroundConsumers.close() + } + } finally { + Utils.closeQuietly(client, "adminClient") + } + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) + def testDescribeGroups(groupProtocol: String): Unit = { + val config = createConfig + client = Admin.create(config) + try { + assertConsumerGroupsIsClean() + + val testTopicName = "test_topic" + val testTopicName1 = testTopicName + "1" + val testTopicName2 = testTopicName + "2" + val testNumPartitions = 2 + prepareTopics(List(testTopicName, testTopicName1, testTopicName2), testNumPartitions) + + val producer = createProducer() + try { + producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() + } finally { + Utils.closeQuietly(producer, "producer") + } + + val testGroupId = "test_group_id" + val testClientId = "test_client_id" + val testInstanceId1 = "test_instance_id_1" + val testInstanceId2 = "test_instance_id_2" + val fakeGroupId = "fake_group_id" + + // contains two static members and one dynamic member + val groupInstances = Set(testInstanceId1, testInstanceId2, "") + val topics = Set(testTopicName, testTopicName1, testTopicName2) + + // We need to disable the auto commit because after the members got removed from group, the offset commit + // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler) + val defaultConsumerConfig = new Properties(consumerConfig) + defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + val backgroundConsumers = prepareConsumers(groupInstances, topics, defaultConsumerConfig) + + try { + val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC + // Start consumer polling threads in the background + backgroundConsumers.start() val describeWithFakeGroupResult = client.describeConsumerGroups(util.List.of(testGroupId, fakeGroupId), new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) @@ -1940,17 +2020,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(testGroupId, testGroupDescription.groupId()) assertFalse(testGroupDescription.isSimpleConsumerGroup) - assertEquals(groupInstanceSet.size, testGroupDescription.members().size()) + assertEquals(groupInstances.size, testGroupDescription.members().size()) val members = testGroupDescription.members() members.asScala.foreach { member => assertEquals(testClientId, member.clientId) assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else Optional.of(true), member.upgraded) } val topicPartitionsByTopic = members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic()) - topicSet.foreach { topic => - val topicPartitions = topicPartitionsByTopic.getOrElse(topic, List.empty) - assertEquals(testNumPartitions, topicPartitions.size) - } + topics.foreach(topic => assertEquals(testNumPartitions, topicPartitionsByTopic.getOrElse(topic, List.empty).size)) val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP) assertEquals(expectedOperations, testGroupDescription.authorizedOperations()) @@ -1963,35 +2040,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Test that all() also throws GroupIdNotFoundException assertFutureThrows(classOf[GroupIdNotFoundException], describeWithFakeGroupResult.all(), s"Group $fakeGroupId not found.") - - val testTopicPart0 = new TopicPartition(testTopicName, 0) - - // Test listConsumerGroupOffsets - TestUtils.waitUntilTrue(() => { - val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get() - parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 1) - }, s"Expected the offset for partition 0 to eventually become 1.") - - // Test listConsumerGroupOffsets with requireStable true - val options = new ListConsumerGroupOffsetsOptions().requireStable(true) - var parts = client.listConsumerGroupOffsets(testGroupId, options) - .partitionsToOffsetAndMetadata().get() - assertTrue(parts.containsKey(testTopicPart0)) - assertEquals(1, parts.get(testTopicPart0).offset()) - - // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec - val groupSpecs = util.Map.of(testGroupId, - new ListConsumerGroupOffsetsSpec().topicPartitions(util.Set.of(new TopicPartition(testTopicName, 0)))) - parts = client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get() - assertTrue(parts.containsKey(testTopicPart0)) - assertEquals(1, parts.get(testTopicPart0).offset()) - - // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and requireStable option - parts = client.listConsumerGroupOffsets(groupSpecs, options).partitionsToOffsetAndMetadata().get() - assertTrue(parts.containsKey(testTopicPart0)) - assertEquals(1, parts.get(testTopicPart0).offset()) } finally { - backgroundConsumerSet.close() + backgroundConsumers.close() } } finally { Utils.closeQuietly(client, "adminClient") @@ -2089,29 +2139,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val config = createConfig client = Admin.create(config) try { - // Verify that initially there are no consumer groups to list. - val list1 = client.listConsumerGroups() - assertEquals(0, list1.all().get().size()) - assertEquals(0, list1.errors().get().size()) - assertEquals(0, list1.valid().get().size()) + assertConsumerGroupsIsClean() + val testTopicName = "test_topic" val testTopicName1 = testTopicName + "1" val testTopicName2 = testTopicName + "2" val testNumPartitions = 2 - client.createTopics(util.List.of( - new NewTopic(testTopicName, testNumPartitions, 1.toShort), - new NewTopic(testTopicName1, testNumPartitions, 1.toShort), - new NewTopic(testTopicName2, testNumPartitions, 1.toShort) - )).all().get() - waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List()) - - val producer = createProducer() - try { - producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() - } finally { - Utils.closeQuietly(producer, "producer") - } + prepareTopics(List(testTopicName, testTopicName1, testTopicName2), testNumPartitions) + prepareRecords(testTopicName) val testGroupId = "test_group_id" val testClientId = "test_client_id" @@ -2291,28 +2327,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { client = Admin.create(config) try { // Verify that initially there are no consumer groups to list. - val list1 = client.listConsumerGroups() - assertEquals(0, list1.all().get().size()) - assertEquals(0, list1.errors().get().size()) - assertEquals(0, list1.valid().get().size()) + assertConsumerGroupsIsClean() val testTopicName = "test_topic" val testTopicName1 = testTopicName + "1" val testTopicName2 = testTopicName + "2" val testNumPartitions = 2 - client.createTopics(util.List.of( - new NewTopic(testTopicName, testNumPartitions, 1.toShort), - new NewTopic(testTopicName1, testNumPartitions, 1.toShort), - new NewTopic(testTopicName2, testNumPartitions, 1.toShort) - )).all().get() - waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List()) + prepareTopics(List(testTopicName, testTopicName1, testTopicName2), testNumPartitions) - val producer = createProducer() - try { - producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() - } finally { - Utils.closeQuietly(producer, "producer") - } + prepareRecords(testTopicName) val testGroupId = "test_group_id" val testClientId = "test_client_id" @@ -2461,12 +2484,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { new NewTopic(testTopicName, 1, 1.toShort))).all().get() waitForTopics(client, List(testTopicName), List()) - val producer = createProducer() - try { - producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() - } finally { - Utils.closeQuietly(producer, "producer") - } + prepareRecords(testTopicName) val newConsumerConfig = new Properties(consumerConfig) newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) @@ -2511,6 +2529,43 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + private def prepareTopics(topics: List[String], numberOfPartitions: Int): Unit = { + client.createTopics(topics.map(topic => new NewTopic(topic, numberOfPartitions, 1.toShort)).asJava).all().get() + waitForTopics(client, topics, List()) + } + + private def prepareRecords(testTopicName: String) = { + val producer = createProducer() + try { + producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() + } finally { + Utils.closeQuietly(producer, "producer") + } + } + + private def prepareConsumers(groupInstanceSet: Set[String], topicSet: Set[String], defaultConsumerConfig: Properties) = { + val backgroundConsumerSet = new BackgroundConsumerSet(defaultConsumerConfig) + groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) => + val configOverrides = new Properties() + if (groupInstanceId != "") { + // static member + configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId) + } + backgroundConsumerSet.addConsumer(topic, configOverrides) + } + backgroundConsumerSet + } + + /** + * Verify that initially there are no consumer groups to list. + */ + private def assertConsumerGroupsIsClean(): Unit = { + val listResult = client.listConsumerGroups() + assertEquals(0, listResult.all().get().size()) + assertEquals(0, listResult.errors().get().size()) + assertEquals(0, listResult.valid().get().size()) + } + @Test def testListGroups(): Unit = { val classicGroupId = "classic_group_id"