From 078d34f39d73bfd4670254565f41947fdad57544 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 18 Nov 2024 16:35:48 +0800 Subject: [PATCH] KAFKA-17910 Create integration tests for Admin.listGroups and Admin.describeClassicGroups (#17712) Reviewers: Andrew Schofield , Chia-Ping Tsai --- .../api/PlaintextAdminIntegrationTest.scala | 129 ++++++++++++++ .../apache/kafka/tools/GroupsCommandTest.java | 158 ++++++++++++++++++ 2 files changed, 287 insertions(+) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 7e1103e2b42..1eba4dba0db 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -2087,6 +2087,135 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + @ParameterizedTest + @ValueSource(strings = Array("kraft+kip932")) + def testListGroups(quorum: String): Unit = { + val classicGroupId = "classic_group_id" + val consumerGroupId = "consumer_group_id" + val shareGroupId = "share_group_id" + val simpleGroupId = "simple_group_id" + val testTopicName = "test_topic" + + consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name) + val classicGroupConfig = new Properties(consumerConfig) + classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId) + val classicGroup = createConsumer(configOverrides = classicGroupConfig) + + consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name) + val consumerGroupConfig = new Properties(consumerConfig) + consumerGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId) + val consumerGroup = createConsumer(configOverrides = consumerGroupConfig) + + val shareGroupConfig = new Properties(consumerConfig) + shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId) + val shareGroup = createShareConsumer(configOverrides = shareGroupConfig) + + val config = createConfig + client = Admin.create(config) + try { + client.createTopics(Collections.singleton( + new NewTopic(testTopicName, 1, 1.toShort) + )).all().get() + waitForTopics(client, List(testTopicName), List()) + val topicPartition = new TopicPartition(testTopicName, 0) + + classicGroup.subscribe(Collections.singleton(testTopicName)) + classicGroup.poll(JDuration.ofMillis(1000)) + consumerGroup.subscribe(Collections.singleton(testTopicName)) + consumerGroup.poll(JDuration.ofMillis(1000)) + shareGroup.subscribe(Collections.singleton(testTopicName)) + shareGroup.poll(JDuration.ofMillis(1000)) + + val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(simpleGroupId, + Collections.singletonMap(topicPartition, new OffsetAndMetadata(0L))) + assertNull(alterConsumerGroupOffsetsResult.all().get()) + assertNull(alterConsumerGroupOffsetsResult.partitionResult(topicPartition).get()) + + TestUtils.waitUntilTrue(() => { + val groups = client.listGroups().all().get() + groups.size() == 4 + }, "Expected to find all groups") + + val classicGroupListing = new GroupListing(classicGroupId, Optional.of(GroupType.CLASSIC), "consumer") + val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer") + val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share") + val simpleGroupListing = new GroupListing(simpleGroupId, Optional.of(GroupType.CLASSIC), "") + + var listGroupsResult = client.listGroups() + assertTrue(listGroupsResult.errors().get().isEmpty) + assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.all().get().asScala.toSet) + assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.valid().get().asScala.toSet) + + listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(java.util.Set.of(GroupType.CLASSIC))) + assertTrue(listGroupsResult.errors().get().isEmpty) + assertEquals(Set(classicGroupListing, simpleGroupListing), listGroupsResult.all().get().asScala.toSet) + assertEquals(Set(classicGroupListing, simpleGroupListing), listGroupsResult.valid().get().asScala.toSet) + + listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(java.util.Set.of(GroupType.CONSUMER))) + assertTrue(listGroupsResult.errors().get().isEmpty) + assertEquals(Set(consumerGroupListing), listGroupsResult.all().get().asScala.toSet) + assertEquals(Set(consumerGroupListing), listGroupsResult.valid().get().asScala.toSet) + + listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(java.util.Set.of(GroupType.SHARE))) + assertTrue(listGroupsResult.errors().get().isEmpty) + assertEquals(Set(shareGroupListing), listGroupsResult.all().get().asScala.toSet) + assertEquals(Set(shareGroupListing), listGroupsResult.valid().get().asScala.toSet) + } finally { + Utils.closeQuietly(classicGroup, "classicGroup") + Utils.closeQuietly(consumerGroup, "consumerGroup") + Utils.closeQuietly(shareGroup, "shareGroup") + Utils.closeQuietly(client, "adminClient") + } + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + def testDescribeClassicGroups(quorum: String, groupProtocol: String): Unit = { + val classicGroupId = "classic_group_id" + val simpleGroupId = "simple_group_id" + val testTopicName = "test_topic" + + val classicGroupConfig = new Properties(consumerConfig) + classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId) + val classicGroup = createConsumer(configOverrides = classicGroupConfig) + + val config = createConfig + client = Admin.create(config) + try { + client.createTopics(Collections.singleton( + new NewTopic(testTopicName, 1, 1.toShort) + )).all().get() + waitForTopics(client, List(testTopicName), List()) + val topicPartition = new TopicPartition(testTopicName, 0) + + classicGroup.subscribe(Collections.singleton(testTopicName)) + classicGroup.poll(JDuration.ofMillis(1000)) + + val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(simpleGroupId, + Collections.singletonMap(topicPartition, new OffsetAndMetadata(0L))) + assertNull(alterConsumerGroupOffsetsResult.all().get()) + assertNull(alterConsumerGroupOffsetsResult.partitionResult(topicPartition).get()) + + val groupIds = Seq(simpleGroupId, classicGroupId) + TestUtils.waitUntilTrue(() => { + val groups = client.describeClassicGroups(groupIds.asJavaCollection).all().get() + groups.size() == 2 + }, "Expected to find all groups") + + val classicConsumers = client.describeClassicGroups(groupIds.asJavaCollection).all().get() + assertNotNull(classicConsumers.get(classicGroupId)) + assertEquals(classicGroupId, classicConsumers.get(classicGroupId).groupId()) + assertEquals("consumer", classicConsumers.get(classicGroupId).protocol()) + + assertNotNull(classicConsumers.get(simpleGroupId)) + assertEquals(simpleGroupId, classicConsumers.get(simpleGroupId).groupId()) + assertTrue(classicConsumers.get(simpleGroupId).protocol().isEmpty) + } finally { + Utils.closeQuietly(classicGroup, "classicGroup") + Utils.closeQuietly(client, "adminClient") + } + } + @ParameterizedTest @ValueSource(strings = Array("kraft+kip932")) def testShareGroups(quorum: String): Unit = { diff --git a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java index cdea5583653..b5a884d9e49 100644 --- a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java @@ -17,27 +17,52 @@ package org.apache.kafka.tools; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AdminClientTestUtils; +import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.GroupListing; import org.apache.kafka.clients.admin.ListGroupsResult; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestExtensions; import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.ExecutionException; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +@Timeout(value = 60) +@ExtendWith(ClusterTestExtensions.class) public class GroupsCommandTest { private final String bootstrapServer = "localhost:9092"; @@ -357,6 +382,122 @@ public class GroupsCommandTest { ))); } + @SuppressWarnings("NPathComplexity") + @ClusterTest( + serverProperties = { + @ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer,share"), + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + } + ) + public void testGroupCommand(ClusterInstance clusterInstance) throws Exception { + String topic = "topic"; + String classicGroupId = "classic_group"; + String consumerGroupId = "consumer_group"; + String shareGroupId = "share_group"; + String simpleGroupId = "simple_group"; + clusterInstance.createTopic("topic", 1, (short) 1); + TopicPartition topicPartition = new TopicPartition(topic, 0); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + + try (KafkaConsumer classicGroup = createKafkaConsumer(clusterInstance, classicGroupId, GroupProtocol.CLASSIC); + KafkaConsumer consumerGroup = createKafkaConsumer(clusterInstance, consumerGroupId, GroupProtocol.CONSUMER); + KafkaShareConsumer shareGroup = createKafkaShareConsumer(clusterInstance, shareGroupId); + Admin admin = clusterInstance.admin(); + GroupsCommand.GroupsService groupsCommand = new GroupsCommand.GroupsService(props) + ) { + classicGroup.subscribe(List.of(topic)); + classicGroup.poll(Duration.ofMillis(1000)); + consumerGroup.subscribe(List.of(topic)); + consumerGroup.poll(Duration.ofMillis(1000)); + shareGroup.subscribe(List.of(topic)); + shareGroup.poll(Duration.ofMillis(1000)); + + AlterConsumerGroupOffsetsResult result = admin.alterConsumerGroupOffsets(simpleGroupId, Map.of(topicPartition, new OffsetAndMetadata(0L))); + assertNull(result.all().get()); + + TestUtils.waitForCondition(() -> { + Map.Entry res = ToolsTestUtils.grabConsoleOutputAndError(() -> + assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions( + List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list").toArray(new String[0]))))); + if (res.getKey().split("\n").length == 5 && res.getValue().isEmpty()) { + assertCapturedListOutput(res.getKey(), + new String[]{classicGroupId, "Classic", "consumer"}, + new String[]{consumerGroupId, "Consumer", "consumer"}, + new String[]{simpleGroupId, "Classic"}, + new String[]{shareGroupId, "Share", "share"}); + return true; + } + return false; + }, "Waiting for listing groups to return all groups"); + + TestUtils.waitForCondition(() -> { + Map.Entry res = ToolsTestUtils.grabConsoleOutputAndError(() -> + assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions( + List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--consumer").toArray(new String[0]))))); + if (res.getKey().split("\n").length == 4 && res.getValue().isEmpty()) { + assertCapturedListOutput(res.getKey(), + new String[]{classicGroupId, "Classic", "consumer"}, + new String[]{consumerGroupId, "Consumer", "consumer"}, + new String[]{simpleGroupId, "Classic"}); + return true; + } + return false; + }, "Waiting for listing groups to return consumer protocol groups"); + + TestUtils.waitForCondition(() -> { + Map.Entry res = ToolsTestUtils.grabConsoleOutputAndError(() -> + assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions( + List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--group-type", "classic").toArray(new String[0]))))); + if (res.getKey().split("\n").length == 3 && res.getValue().isEmpty()) { + assertCapturedListOutput(res.getKey(), + new String[]{classicGroupId, "Classic", "consumer"}, + new String[]{simpleGroupId, "Classic"}); + return true; + } + return false; + }, "Waiting for listing groups to return classic type groups"); + + TestUtils.waitForCondition(() -> { + Map.Entry res = ToolsTestUtils.grabConsoleOutputAndError(() -> + assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions( + List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--group-type", "consumer").toArray(new String[0]))))); + if (res.getKey().split("\n").length == 2 && res.getValue().isEmpty()) { + assertCapturedListOutput(res.getKey(), + new String[]{consumerGroupId, "Consumer", "consumer"}); + return true; + } + return false; + }, "Waiting for listing groups to return consumer type groups"); + + TestUtils.waitForCondition(() -> { + Map.Entry res = ToolsTestUtils.grabConsoleOutputAndError(() -> + assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions( + List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--group-type", "share").toArray(new String[0]))))); + if (res.getKey().split("\n").length == 2 && res.getValue().isEmpty()) { + assertCapturedListOutput(res.getKey(), + new String[]{shareGroupId, "Share", "share"}); + return true; + } + return false; + }, "Waiting for listing groups to return share type groups"); + + TestUtils.waitForCondition(() -> { + Map.Entry res = ToolsTestUtils.grabConsoleOutputAndError(() -> + assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions( + List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--share").toArray(new String[0]))))); + if (res.getKey().split("\n").length == 2 && res.getValue().isEmpty()) { + assertCapturedListOutput(res.getKey(), + new String[]{shareGroupId, "Share", "share"}); + return true; + } + return false; + }, "Waiting for listing groups to return share type groups"); + } + } + private void assertInitializeInvalidOptionsExitCode(int expected, String[] options) { Exit.setExitProcedure((exitCode, message) -> { assertEquals(expected, exitCode); @@ -378,4 +519,21 @@ public class GroupsCommandTest { assertEquals(String.join(",", line), String.join(",", capturedLines[i++].split(" +"))); } } + + private KafkaConsumer createKafkaConsumer(ClusterInstance clusterInstance, String groupId, GroupProtocol groupProtocol) { + return new KafkaConsumer<>(Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, groupId, + ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())); + } + + private KafkaShareConsumer createKafkaShareConsumer(ClusterInstance clusterInstance, String groupId) { + return new KafkaShareConsumer<>(Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, groupId, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())); + } }