mirror of https://github.com/apache/kafka.git
KAFKA-17291: Add integration test for share group list and describe (#16920)
Add an integration test for share group list and describe admin operations. Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
parent
eafb92bdce
commit
ffc865c432
|
@ -291,6 +291,7 @@ import java.util.function.Predicate;
|
|||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.kafka.clients.admin.internals.AdminUtils.validAclOperations;
|
||||
import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_NAME;
|
||||
import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
|
||||
import static org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
|
||||
|
@ -3499,19 +3500,6 @@ public class KafkaAdminClient extends AdminClient {
|
|||
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
|
||||
}
|
||||
|
||||
private Set<AclOperation> validAclOperations(final int authorizedOperations) {
|
||||
if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {
|
||||
return null;
|
||||
}
|
||||
return Utils.from32BitField(authorizedOperations)
|
||||
.stream()
|
||||
.map(AclOperation::fromCode)
|
||||
.filter(operation -> operation != AclOperation.UNKNOWN
|
||||
&& operation != AclOperation.ALL
|
||||
&& operation != AclOperation.ANY)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private static final class ListConsumerGroupsResults {
|
||||
private final List<Throwable> errors;
|
||||
private final HashMap<String, ConsumerGroupListing> listings;
|
||||
|
|
|
@ -82,14 +82,8 @@ public class ShareGroupListing {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o)
|
||||
return true;
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
if (!(o instanceof ShareGroupListing))
|
||||
return false;
|
||||
if (getClass() != o.getClass())
|
||||
return false;
|
||||
if (this == o) return true;
|
||||
if (!(o instanceof ShareGroupListing)) return false;
|
||||
ShareGroupListing that = (ShareGroupListing) o;
|
||||
return Objects.equals(groupId, that.groupId) &&
|
||||
Objects.equals(state, that.state);
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.admin.internals;
|
||||
|
||||
import org.apache.kafka.common.acl.AclOperation;
|
||||
import org.apache.kafka.common.requests.MetadataResponse;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public final class AdminUtils {
|
||||
|
||||
private AdminUtils() {}
|
||||
|
||||
public static Set<AclOperation> validAclOperations(final int authorizedOperations) {
|
||||
if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {
|
||||
return null;
|
||||
}
|
||||
return Utils.from32BitField(authorizedOperations)
|
||||
.stream()
|
||||
.map(AclOperation::fromCode)
|
||||
.filter(operation -> operation != AclOperation.UNKNOWN
|
||||
&& operation != AclOperation.ALL
|
||||
&& operation != AclOperation.ANY)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
}
|
|
@ -40,9 +40,7 @@ import org.apache.kafka.common.requests.DescribeGroupsRequest;
|
|||
import org.apache.kafka.common.requests.DescribeGroupsResponse;
|
||||
import org.apache.kafka.common.requests.FindCoordinatorRequest;
|
||||
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
|
||||
import org.apache.kafka.common.requests.MetadataResponse;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
|
@ -58,6 +56,8 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.kafka.clients.admin.internals.AdminUtils.validAclOperations;
|
||||
|
||||
public class DescribeConsumerGroupsHandler implements AdminApiHandler<CoordinatorKey, ConsumerGroupDescription> {
|
||||
|
||||
private final boolean includeAuthorizedOperations;
|
||||
|
@ -367,18 +367,4 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler<Coordinato
|
|||
failed.put(groupId, error.exception(errorMsg));
|
||||
}
|
||||
}
|
||||
|
||||
private Set<AclOperation> validAclOperations(final int authorizedOperations) {
|
||||
if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {
|
||||
return null;
|
||||
}
|
||||
return Utils.from32BitField(authorizedOperations)
|
||||
.stream()
|
||||
.map(AclOperation::fromCode)
|
||||
.filter(operation -> operation != AclOperation.UNKNOWN
|
||||
&& operation != AclOperation.ALL
|
||||
&& operation != AclOperation.ANY)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,16 +29,15 @@ import org.apache.kafka.common.protocol.Errors;
|
|||
import org.apache.kafka.common.requests.AbstractResponse;
|
||||
import org.apache.kafka.common.requests.FindCoordinatorRequest;
|
||||
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
|
||||
import org.apache.kafka.common.requests.MetadataResponse;
|
||||
import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
|
||||
import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -46,6 +45,8 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.kafka.clients.admin.internals.AdminUtils.validAclOperations;
|
||||
|
||||
public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<CoordinatorKey, ShareGroupDescription> {
|
||||
|
||||
private final boolean includeAuthorizedOperations;
|
||||
|
@ -109,7 +110,7 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<Coordina
|
|||
CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId());
|
||||
Errors error = Errors.forCode(describedGroup.errorCode());
|
||||
if (error != Errors.NONE) {
|
||||
handleError(groupIdKey, error, describedGroup.errorMessage(), failed, groupsToUnmap);
|
||||
handleError(groupIdKey, describedGroup, coordinator, error, describedGroup.errorMessage(), completed, failed, groupsToUnmap);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -147,8 +148,11 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<Coordina
|
|||
|
||||
private void handleError(
|
||||
CoordinatorKey groupId,
|
||||
ShareGroupDescribeResponseData.DescribedGroup describedGroup,
|
||||
Node coordinator,
|
||||
Errors error,
|
||||
String errorMsg,
|
||||
Map<CoordinatorKey, ShareGroupDescription> completed,
|
||||
Map<CoordinatorKey, Throwable> failed,
|
||||
Set<CoordinatorKey> groupsToUnmap) {
|
||||
switch (error) {
|
||||
|
@ -173,8 +177,17 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<Coordina
|
|||
break;
|
||||
|
||||
case GROUP_ID_NOT_FOUND:
|
||||
log.error("`DescribeShareGroups` request for group id {} failed because the group does not exist.", groupId.idValue);
|
||||
failed.put(groupId, error.exception(errorMsg));
|
||||
// In order to maintain compatibility with describeConsumerGroups, an unknown group ID is
|
||||
// reported as a DEAD share group, and the admin client operation did not fail
|
||||
log.debug("`DescribeShareGroups` request for group id {} failed because the group does not exist. {}",
|
||||
groupId.idValue, errorMsg != null ? errorMsg : "");
|
||||
final ShareGroupDescription shareGroupDescription =
|
||||
new ShareGroupDescription(groupId.idValue,
|
||||
Collections.emptySet(),
|
||||
ShareGroupState.DEAD,
|
||||
coordinator,
|
||||
validAclOperations(describedGroup.authorizedOperations()));
|
||||
completed.put(groupId, shareGroupDescription);
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -182,17 +195,4 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<Coordina
|
|||
failed.put(groupId, error.exception(errorMsg));
|
||||
}
|
||||
}
|
||||
|
||||
private Set<AclOperation> validAclOperations(final int authorizedOperations) {
|
||||
if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {
|
||||
return null;
|
||||
}
|
||||
return Utils.from32BitField(authorizedOperations)
|
||||
.stream()
|
||||
.map(AclOperation::fromCode)
|
||||
.filter(operation -> operation != AclOperation.UNKNOWN
|
||||
&& operation != AclOperation.ALL
|
||||
&& operation != AclOperation.ANY)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package kafka.api
|
||||
|
||||
import java.time.Duration
|
||||
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer}
|
||||
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, KafkaShareConsumer, ShareConsumer}
|
||||
import kafka.utils.TestUtils
|
||||
import kafka.utils.Implicits._
|
||||
|
||||
|
@ -27,11 +27,11 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
|
|||
import kafka.server.KafkaConfig
|
||||
import kafka.integration.KafkaServerTestHarness
|
||||
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
|
||||
import org.apache.kafka.common.network.{ListenerName, ConnectionMode}
|
||||
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
|
||||
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
|
||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}
|
||||
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs}
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
|
||||
|
||||
import scala.collection.mutable
|
||||
|
@ -46,12 +46,14 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
|
|||
|
||||
val producerConfig = new Properties
|
||||
val consumerConfig = new Properties
|
||||
val shareConsumerConfig = new Properties
|
||||
val adminClientConfig = new Properties
|
||||
val superuserClientConfig = new Properties
|
||||
val serverConfig = new Properties
|
||||
val controllerConfig = new Properties
|
||||
|
||||
private val consumers = mutable.Buffer[Consumer[_, _]]()
|
||||
private val shareConsumers = mutable.Buffer[ShareConsumer[_, _]]()
|
||||
private val producers = mutable.Buffer[KafkaProducer[_, _]]()
|
||||
private val adminClients = mutable.Buffer[Admin]()
|
||||
|
||||
|
@ -73,6 +75,11 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
|
|||
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"))
|
||||
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer"))
|
||||
}
|
||||
if (isShareGroupTest()) {
|
||||
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"))
|
||||
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share"))
|
||||
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"))
|
||||
}
|
||||
|
||||
if(isKRaftTest()) {
|
||||
cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath))
|
||||
|
@ -135,6 +142,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
|
|||
// Generate client security properties before starting the brokers in case certs are needed
|
||||
producerConfig ++= clientSecurityProps("producer")
|
||||
consumerConfig ++= clientSecurityProps("consumer")
|
||||
shareConsumerConfig ++= clientSecurityProps("shareConsumer")
|
||||
adminClientConfig ++= clientSecurityProps("adminClient")
|
||||
superuserClientConfig ++= superuserSecurityProps("superuserClient")
|
||||
|
||||
|
@ -152,6 +160,11 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
|
|||
consumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
|
||||
maybeGroupProtocolSpecified(testInfo).map(groupProtocol => consumerConfig.putIfAbsent(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name))
|
||||
|
||||
shareConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
|
||||
shareConsumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group")
|
||||
shareConsumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
|
||||
shareConsumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
|
||||
|
||||
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
|
||||
|
||||
doSuperuserSetup(testInfo)
|
||||
|
@ -194,6 +207,19 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
|
|||
consumer
|
||||
}
|
||||
|
||||
def createShareConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
|
||||
valueDeserializer: Deserializer[V] = new ByteArrayDeserializer,
|
||||
configOverrides: Properties = new Properties,
|
||||
configsToRemove: List[String] = List()): ShareConsumer[K, V] = {
|
||||
val props = new Properties
|
||||
props ++= shareConsumerConfig
|
||||
props ++= configOverrides
|
||||
configsToRemove.foreach(props.remove(_))
|
||||
val shareConsumer = new KafkaShareConsumer[K, V](props, keyDeserializer, valueDeserializer)
|
||||
shareConsumers += shareConsumer
|
||||
shareConsumer
|
||||
}
|
||||
|
||||
def createAdminClient(
|
||||
listenerName: ListenerName = listenerName,
|
||||
configOverrides: Properties = new Properties
|
||||
|
@ -224,10 +250,13 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
|
|||
producers.foreach(_.close(Duration.ZERO))
|
||||
consumers.foreach(_.wakeup())
|
||||
consumers.foreach(_.close(Duration.ZERO))
|
||||
shareConsumers.foreach(_.wakeup())
|
||||
shareConsumers.foreach(_.close(Duration.ZERO))
|
||||
adminClients.foreach(_.close(Duration.ZERO))
|
||||
|
||||
producers.clear()
|
||||
consumers.clear()
|
||||
shareConsumers.clear()
|
||||
adminClients.clear()
|
||||
} finally {
|
||||
super.tearDown()
|
||||
|
|
|
@ -33,7 +33,7 @@ import kafka.utils.{Log4jController, TestUtils}
|
|||
import org.apache.kafka.clients.HostResolver
|
||||
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
|
||||
import org.apache.kafka.clients.admin._
|
||||
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer}
|
||||
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, ShareConsumer}
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
|
||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
|
||||
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
|
||||
|
@ -45,7 +45,7 @@ import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
|
|||
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
|
||||
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
|
||||
import org.apache.kafka.common.utils.{Time, Utils}
|
||||
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, IsolationLevel, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
|
||||
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, IsolationLevel, ShareGroupState, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
|
||||
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
|
||||
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
|
@ -1700,9 +1700,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
try {
|
||||
// Verify that initially there are no consumer groups to list.
|
||||
val list1 = client.listConsumerGroups()
|
||||
assertTrue(0 == list1.all().get().size())
|
||||
assertTrue(0 == list1.errors().get().size())
|
||||
assertTrue(0 == list1.valid().get().size())
|
||||
assertEquals(0, list1.all().get().size())
|
||||
assertEquals(0, list1.errors().get().size())
|
||||
assertEquals(0, list1.valid().get().size())
|
||||
val testTopicName = "test_topic"
|
||||
val testTopicName1 = testTopicName + "1"
|
||||
val testTopicName2 = testTopicName + "2"
|
||||
|
@ -1983,6 +1983,158 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = Array("kraft+kip932"))
|
||||
def testShareGroups(quorum: String): Unit = {
|
||||
val testGroupId = "test_group_id"
|
||||
val testClientId = "test_client_id"
|
||||
val fakeGroupId = "fake_group_id"
|
||||
val testTopicName = "test_topic"
|
||||
val testNumPartitions = 2
|
||||
|
||||
def createProperties(): Properties = {
|
||||
val newConsumerConfig = new Properties(consumerConfig)
|
||||
newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
|
||||
newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
|
||||
newConsumerConfig
|
||||
}
|
||||
|
||||
val consumerSet = Set(createShareConsumer(configOverrides = createProperties()))
|
||||
val topicSet = Set(testTopicName)
|
||||
|
||||
val latch = new CountDownLatch(consumerSet.size)
|
||||
|
||||
def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], topic: String): Thread = {
|
||||
new Thread {
|
||||
override def run : Unit = {
|
||||
consumer.subscribe(Collections.singleton(topic))
|
||||
try {
|
||||
while (true) {
|
||||
consumer.poll(JDuration.ofSeconds(5))
|
||||
if (latch.getCount > 0L)
|
||||
latch.countDown()
|
||||
consumer.commitSync()
|
||||
}
|
||||
} catch {
|
||||
case _: InterruptException => // Suppress the output to stderr
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val config = createConfig
|
||||
client = Admin.create(config)
|
||||
val producer = createProducer()
|
||||
try {
|
||||
// Verify that initially there are no share groups to list.
|
||||
val list1 = client.listShareGroups()
|
||||
assertEquals(0, list1.all().get().size())
|
||||
assertEquals(0, list1.errors().get().size())
|
||||
assertEquals(0, list1.valid().get().size())
|
||||
|
||||
client.createTopics(Collections.singleton(
|
||||
new NewTopic(testTopicName, testNumPartitions, 1.toShort)
|
||||
)).all().get()
|
||||
waitForTopics(client, List(testTopicName), List())
|
||||
|
||||
producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
|
||||
|
||||
// Start consumers in a thread that will subscribe to a new group.
|
||||
val consumerThreads = consumerSet.zip(topicSet).map(zipped => createShareConsumerThread(zipped._1, zipped._2))
|
||||
|
||||
try {
|
||||
consumerThreads.foreach(_.start())
|
||||
assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
|
||||
|
||||
// Test that we can list the new group.
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
client.listShareGroups.all.get.stream().filter(group =>
|
||||
group.groupId == testGroupId &&
|
||||
group.state.get == ShareGroupState.STABLE).count() == 1
|
||||
}, s"Expected to be able to list $testGroupId")
|
||||
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
val options = new ListShareGroupsOptions().inStates(Collections.singleton(ShareGroupState.STABLE))
|
||||
client.listShareGroups(options).all.get.stream().filter(group =>
|
||||
group.groupId == testGroupId &&
|
||||
group.state.get == ShareGroupState.STABLE).count() == 1
|
||||
}, s"Expected to be able to list $testGroupId in state Stable")
|
||||
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
val options = new ListShareGroupsOptions().inStates(Collections.singleton(ShareGroupState.EMPTY))
|
||||
client.listShareGroups(options).all.get.stream().filter(_.groupId == testGroupId).count() == 0
|
||||
}, s"Expected to find zero groups")
|
||||
|
||||
val describeWithFakeGroupResult = client.describeShareGroups(util.Arrays.asList(testGroupId, fakeGroupId),
|
||||
new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
|
||||
assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
|
||||
|
||||
// Test that we can get information about the test share group.
|
||||
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
|
||||
assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
|
||||
var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
|
||||
|
||||
assertEquals(testGroupId, testGroupDescription.groupId())
|
||||
assertEquals(consumerSet.size, testGroupDescription.members().size())
|
||||
val members = testGroupDescription.members()
|
||||
members.forEach(member => assertEquals(testClientId, member.clientId()))
|
||||
val topicPartitionsByTopic = members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
|
||||
topicSet.foreach { topic =>
|
||||
val topicPartitions = topicPartitionsByTopic.getOrElse(topic, List.empty)
|
||||
assertEquals(testNumPartitions, topicPartitions.size)
|
||||
}
|
||||
|
||||
val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP)
|
||||
assertEquals(expectedOperations, testGroupDescription.authorizedOperations())
|
||||
|
||||
// Test that the fake group is listed as dead.
|
||||
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
|
||||
val fakeGroupDescription = describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
|
||||
|
||||
assertEquals(fakeGroupId, fakeGroupDescription.groupId())
|
||||
assertEquals(0, fakeGroupDescription.members().size())
|
||||
assertEquals(ShareGroupState.DEAD, fakeGroupDescription.state())
|
||||
assertNull(fakeGroupDescription.authorizedOperations())
|
||||
|
||||
// Test that all() returns 2 results
|
||||
assertEquals(2, describeWithFakeGroupResult.all().get().size())
|
||||
|
||||
val describeTestGroupResult = client.describeShareGroups(Collections.singleton(testGroupId),
|
||||
new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
|
||||
assertEquals(1, describeTestGroupResult.all().get().size())
|
||||
assertEquals(1, describeTestGroupResult.describedGroups().size())
|
||||
|
||||
testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get()
|
||||
|
||||
assertEquals(testGroupId, testGroupDescription.groupId)
|
||||
assertEquals(consumerSet.size, testGroupDescription.members().size())
|
||||
|
||||
// Describing a share group using describeConsumerGroups reports it as a DEAD consumer group
|
||||
// in the same way as a non-existent group
|
||||
val describeConsumerGroupResult = client.describeConsumerGroups(Collections.singleton(testGroupId),
|
||||
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
|
||||
assertEquals(1, describeConsumerGroupResult.all().get().size())
|
||||
|
||||
val deadConsumerGroupDescription = describeConsumerGroupResult.describedGroups().get(testGroupId).get()
|
||||
assertEquals(testGroupId, deadConsumerGroupDescription.groupId())
|
||||
assertEquals(0, deadConsumerGroupDescription.members().size())
|
||||
assertEquals("", deadConsumerGroupDescription.partitionAssignor())
|
||||
assertEquals(ConsumerGroupState.DEAD, deadConsumerGroupDescription.state())
|
||||
assertEquals(expectedOperations, deadConsumerGroupDescription.authorizedOperations())
|
||||
} finally {
|
||||
consumerThreads.foreach {
|
||||
case consumerThread =>
|
||||
consumerThread.interrupt()
|
||||
consumerThread.join()
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
consumerSet.foreach(consumer => Utils.closeQuietly(consumer, "consumer"))
|
||||
Utils.closeQuietly(producer, "producer")
|
||||
Utils.closeQuietly(client, "adminClient")
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = Array("zk", "kraft"))
|
||||
def testElectPreferredLeaders(quorum: String): Unit = {
|
||||
|
|
|
@ -195,6 +195,10 @@ abstract class QuorumTestHarness extends Logging {
|
|||
TestInfoUtils.isNewGroupCoordinatorEnabled(testInfo)
|
||||
}
|
||||
|
||||
def isShareGroupTest(): Boolean = {
|
||||
TestInfoUtils.isShareGroupTest(testInfo)
|
||||
}
|
||||
|
||||
def maybeGroupProtocolSpecified(testInfo: TestInfo): Option[GroupProtocol] = {
|
||||
TestInfoUtils.maybeGroupProtocolSpecified(testInfo)
|
||||
}
|
||||
|
|
|
@ -59,6 +59,10 @@ object TestInfoUtils {
|
|||
testInfo.getDisplayName.contains("kraft+kip848")
|
||||
}
|
||||
|
||||
def isShareGroupTest(testInfo: TestInfo): Boolean = {
|
||||
testInfo.getDisplayName.contains("kraft+kip932")
|
||||
}
|
||||
|
||||
def maybeGroupProtocolSpecified(testInfo: TestInfo): Option[GroupProtocol] = {
|
||||
if (testInfo.getDisplayName.contains("groupProtocol=classic"))
|
||||
Some(GroupProtocol.CLASSIC)
|
||||
|
|
Loading…
Reference in New Issue