KAFKA-17910 Create integration tests for Admin.listGroups and Admin.describeClassicGroups (#17712)

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2024-11-18 16:35:48 +08:00 committed by GitHub
parent a592912ec9
commit 078d34f39d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 287 additions and 0 deletions

View File

@ -2087,6 +2087,135 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip932"))
def testListGroups(quorum: String): Unit = {
val classicGroupId = "classic_group_id"
val consumerGroupId = "consumer_group_id"
val shareGroupId = "share_group_id"
val simpleGroupId = "simple_group_id"
val testTopicName = "test_topic"
consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name)
val classicGroupConfig = new Properties(consumerConfig)
classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId)
val classicGroup = createConsumer(configOverrides = classicGroupConfig)
consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name)
val consumerGroupConfig = new Properties(consumerConfig)
consumerGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId)
val consumerGroup = createConsumer(configOverrides = consumerGroupConfig)
val shareGroupConfig = new Properties(consumerConfig)
shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId)
val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
val config = createConfig
client = Admin.create(config)
try {
client.createTopics(Collections.singleton(
new NewTopic(testTopicName, 1, 1.toShort)
)).all().get()
waitForTopics(client, List(testTopicName), List())
val topicPartition = new TopicPartition(testTopicName, 0)
classicGroup.subscribe(Collections.singleton(testTopicName))
classicGroup.poll(JDuration.ofMillis(1000))
consumerGroup.subscribe(Collections.singleton(testTopicName))
consumerGroup.poll(JDuration.ofMillis(1000))
shareGroup.subscribe(Collections.singleton(testTopicName))
shareGroup.poll(JDuration.ofMillis(1000))
val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(simpleGroupId,
Collections.singletonMap(topicPartition, new OffsetAndMetadata(0L)))
assertNull(alterConsumerGroupOffsetsResult.all().get())
assertNull(alterConsumerGroupOffsetsResult.partitionResult(topicPartition).get())
TestUtils.waitUntilTrue(() => {
val groups = client.listGroups().all().get()
groups.size() == 4
}, "Expected to find all groups")
val classicGroupListing = new GroupListing(classicGroupId, Optional.of(GroupType.CLASSIC), "consumer")
val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer")
val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share")
val simpleGroupListing = new GroupListing(simpleGroupId, Optional.of(GroupType.CLASSIC), "")
var listGroupsResult = client.listGroups()
assertTrue(listGroupsResult.errors().get().isEmpty)
assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.all().get().asScala.toSet)
assertEquals(Set(classicGroupListing, simpleGroupListing, consumerGroupListing, shareGroupListing), listGroupsResult.valid().get().asScala.toSet)
listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(java.util.Set.of(GroupType.CLASSIC)))
assertTrue(listGroupsResult.errors().get().isEmpty)
assertEquals(Set(classicGroupListing, simpleGroupListing), listGroupsResult.all().get().asScala.toSet)
assertEquals(Set(classicGroupListing, simpleGroupListing), listGroupsResult.valid().get().asScala.toSet)
listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(java.util.Set.of(GroupType.CONSUMER)))
assertTrue(listGroupsResult.errors().get().isEmpty)
assertEquals(Set(consumerGroupListing), listGroupsResult.all().get().asScala.toSet)
assertEquals(Set(consumerGroupListing), listGroupsResult.valid().get().asScala.toSet)
listGroupsResult = client.listGroups(new ListGroupsOptions().withTypes(java.util.Set.of(GroupType.SHARE)))
assertTrue(listGroupsResult.errors().get().isEmpty)
assertEquals(Set(shareGroupListing), listGroupsResult.all().get().asScala.toSet)
assertEquals(Set(shareGroupListing), listGroupsResult.valid().get().asScala.toSet)
} finally {
Utils.closeQuietly(classicGroup, "classicGroup")
Utils.closeQuietly(consumerGroup, "consumerGroup")
Utils.closeQuietly(shareGroup, "shareGroup")
Utils.closeQuietly(client, "adminClient")
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testDescribeClassicGroups(quorum: String, groupProtocol: String): Unit = {
val classicGroupId = "classic_group_id"
val simpleGroupId = "simple_group_id"
val testTopicName = "test_topic"
val classicGroupConfig = new Properties(consumerConfig)
classicGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, classicGroupId)
val classicGroup = createConsumer(configOverrides = classicGroupConfig)
val config = createConfig
client = Admin.create(config)
try {
client.createTopics(Collections.singleton(
new NewTopic(testTopicName, 1, 1.toShort)
)).all().get()
waitForTopics(client, List(testTopicName), List())
val topicPartition = new TopicPartition(testTopicName, 0)
classicGroup.subscribe(Collections.singleton(testTopicName))
classicGroup.poll(JDuration.ofMillis(1000))
val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(simpleGroupId,
Collections.singletonMap(topicPartition, new OffsetAndMetadata(0L)))
assertNull(alterConsumerGroupOffsetsResult.all().get())
assertNull(alterConsumerGroupOffsetsResult.partitionResult(topicPartition).get())
val groupIds = Seq(simpleGroupId, classicGroupId)
TestUtils.waitUntilTrue(() => {
val groups = client.describeClassicGroups(groupIds.asJavaCollection).all().get()
groups.size() == 2
}, "Expected to find all groups")
val classicConsumers = client.describeClassicGroups(groupIds.asJavaCollection).all().get()
assertNotNull(classicConsumers.get(classicGroupId))
assertEquals(classicGroupId, classicConsumers.get(classicGroupId).groupId())
assertEquals("consumer", classicConsumers.get(classicGroupId).protocol())
assertNotNull(classicConsumers.get(simpleGroupId))
assertEquals(simpleGroupId, classicConsumers.get(simpleGroupId).groupId())
assertTrue(classicConsumers.get(simpleGroupId).protocol().isEmpty)
} finally {
Utils.closeQuietly(classicGroup, "classicGroup")
Utils.closeQuietly(client, "adminClient")
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip932"))
def testShareGroups(quorum: String): Unit = {

View File

@ -17,27 +17,52 @@
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestExtensions;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Timeout(value = 60)
@ExtendWith(ClusterTestExtensions.class)
public class GroupsCommandTest {
private final String bootstrapServer = "localhost:9092";
@ -357,6 +382,122 @@ public class GroupsCommandTest {
)));
}
@SuppressWarnings("NPathComplexity")
@ClusterTest(
serverProperties = {
@ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer,share"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
}
)
public void testGroupCommand(ClusterInstance clusterInstance) throws Exception {
String topic = "topic";
String classicGroupId = "classic_group";
String consumerGroupId = "consumer_group";
String shareGroupId = "share_group";
String simpleGroupId = "simple_group";
clusterInstance.createTopic("topic", 1, (short) 1);
TopicPartition topicPartition = new TopicPartition(topic, 0);
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
try (KafkaConsumer<String, String> classicGroup = createKafkaConsumer(clusterInstance, classicGroupId, GroupProtocol.CLASSIC);
KafkaConsumer<String, String> consumerGroup = createKafkaConsumer(clusterInstance, consumerGroupId, GroupProtocol.CONSUMER);
KafkaShareConsumer<String, String> shareGroup = createKafkaShareConsumer(clusterInstance, shareGroupId);
Admin admin = clusterInstance.admin();
GroupsCommand.GroupsService groupsCommand = new GroupsCommand.GroupsService(props)
) {
classicGroup.subscribe(List.of(topic));
classicGroup.poll(Duration.ofMillis(1000));
consumerGroup.subscribe(List.of(topic));
consumerGroup.poll(Duration.ofMillis(1000));
shareGroup.subscribe(List.of(topic));
shareGroup.poll(Duration.ofMillis(1000));
AlterConsumerGroupOffsetsResult result = admin.alterConsumerGroupOffsets(simpleGroupId, Map.of(topicPartition, new OffsetAndMetadata(0L)));
assertNull(result.all().get());
TestUtils.waitForCondition(() -> {
Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(() ->
assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions(
List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list").toArray(new String[0])))));
if (res.getKey().split("\n").length == 5 && res.getValue().isEmpty()) {
assertCapturedListOutput(res.getKey(),
new String[]{classicGroupId, "Classic", "consumer"},
new String[]{consumerGroupId, "Consumer", "consumer"},
new String[]{simpleGroupId, "Classic"},
new String[]{shareGroupId, "Share", "share"});
return true;
}
return false;
}, "Waiting for listing groups to return all groups");
TestUtils.waitForCondition(() -> {
Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(() ->
assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions(
List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--consumer").toArray(new String[0])))));
if (res.getKey().split("\n").length == 4 && res.getValue().isEmpty()) {
assertCapturedListOutput(res.getKey(),
new String[]{classicGroupId, "Classic", "consumer"},
new String[]{consumerGroupId, "Consumer", "consumer"},
new String[]{simpleGroupId, "Classic"});
return true;
}
return false;
}, "Waiting for listing groups to return consumer protocol groups");
TestUtils.waitForCondition(() -> {
Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(() ->
assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions(
List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--group-type", "classic").toArray(new String[0])))));
if (res.getKey().split("\n").length == 3 && res.getValue().isEmpty()) {
assertCapturedListOutput(res.getKey(),
new String[]{classicGroupId, "Classic", "consumer"},
new String[]{simpleGroupId, "Classic"});
return true;
}
return false;
}, "Waiting for listing groups to return classic type groups");
TestUtils.waitForCondition(() -> {
Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(() ->
assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions(
List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--group-type", "consumer").toArray(new String[0])))));
if (res.getKey().split("\n").length == 2 && res.getValue().isEmpty()) {
assertCapturedListOutput(res.getKey(),
new String[]{consumerGroupId, "Consumer", "consumer"});
return true;
}
return false;
}, "Waiting for listing groups to return consumer type groups");
TestUtils.waitForCondition(() -> {
Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(() ->
assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions(
List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--group-type", "share").toArray(new String[0])))));
if (res.getKey().split("\n").length == 2 && res.getValue().isEmpty()) {
assertCapturedListOutput(res.getKey(),
new String[]{shareGroupId, "Share", "share"});
return true;
}
return false;
}, "Waiting for listing groups to return share type groups");
TestUtils.waitForCondition(() -> {
Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(() ->
assertDoesNotThrow(() -> groupsCommand.listGroups(new GroupsCommand.GroupsCommandOptions(
List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--share").toArray(new String[0])))));
if (res.getKey().split("\n").length == 2 && res.getValue().isEmpty()) {
assertCapturedListOutput(res.getKey(),
new String[]{shareGroupId, "Share", "share"});
return true;
}
return false;
}, "Waiting for listing groups to return share type groups");
}
}
private void assertInitializeInvalidOptionsExitCode(int expected, String[] options) {
Exit.setExitProcedure((exitCode, message) -> {
assertEquals(expected, exitCode);
@ -378,4 +519,21 @@ public class GroupsCommandTest {
assertEquals(String.join(",", line), String.join(",", capturedLines[i++].split(" +")));
}
}
private KafkaConsumer<String, String> createKafkaConsumer(ClusterInstance clusterInstance, String groupId, GroupProtocol groupProtocol) {
return new KafkaConsumer<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, groupId,
ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()));
}
private KafkaShareConsumer<String, String> createKafkaShareConsumer(ClusterInstance clusterInstance, String groupId) {
return new KafkaShareConsumer<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()));
}
}