KAFKA-17550: DescribeGroups v6 exploitation (#17706)

This PR introduces the DescribeGroups v6 API as part of KIP-1043. This adds an error message for the described groups so that it is possible to get some context on the error. It also changes the behaviour for when the group ID cannot be found but returning error code GROUP_ID_NOT_FOUND rather than NONE.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Andrew Schofield 2024-12-06 07:12:24 +00:00 committed by GitHub
parent 8864cba0e8
commit e7d986e48c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 475 additions and 173 deletions

View File

@ -64,6 +64,7 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler<Coordinato
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
private final Set<String> useClassicGroupApi;
private final Map<String, String> groupIdNotFoundErrorMessages;
public DescribeConsumerGroupsHandler(
boolean includeAuthorizedOperations,
@ -73,6 +74,7 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler<Coordinato
this.log = logContext.logger(DescribeConsumerGroupsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
this.useClassicGroupApi = new HashSet<>();
this.groupIdNotFoundErrorMessages = new HashMap<>();
}
private static Set<CoordinatorKey> buildKeySet(Collection<String> groupIds) {
@ -255,7 +257,7 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler<Coordinato
handleError(
groupIdKey,
error,
null,
describedGroup.errorMessage(),
failed,
groupsToUnmap,
false
@ -354,11 +356,18 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler<Coordinato
case GROUP_ID_NOT_FOUND:
if (isConsumerGroupResponse) {
log.debug("`{}` request for group id {} failed because the group is not " +
"a new consumer group. Will retry with `DescribeGroups` API.", apiName, groupId.idValue);
"a new consumer group. Will retry with `DescribeGroups` API. {}",
apiName, groupId.idValue, errorMsg != null ? errorMsg : "");
useClassicGroupApi.add(groupId.idValue);
// The error message from the ConsumerGroupDescribe API is more informative to the user
// than the error message from the classic group API. Capture it and use it if we get the
// same error code for the classic group API also.
groupIdNotFoundErrorMessages.put(groupId.idValue, errorMsg);
} else {
log.error("`{}` request for group id {} failed because the group does not exist.", apiName, groupId.idValue);
failed.put(groupId, error.exception(errorMsg));
log.debug("`{}` request for group id {} failed because the group does not exist. {}",
apiName, groupId.idValue, errorMsg != null ? errorMsg : "");
failed.put(groupId, error.exception(groupIdNotFoundErrorMessages.getOrDefault(groupId.idValue, errorMsg)));
}
break;

View File

@ -37,7 +37,6 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -177,17 +176,9 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<Coordina
break;
case GROUP_ID_NOT_FOUND:
// In order to maintain compatibility with describeConsumerGroups, an unknown group ID is
// reported as a DEAD share group, and the admin client operation did not fail
log.debug("`DescribeShareGroups` request for group id {} failed because the group does not exist. {}",
groupId.idValue, errorMsg != null ? errorMsg : "");
final ShareGroupDescription shareGroupDescription =
new ShareGroupDescription(groupId.idValue,
Collections.emptySet(),
GroupState.DEAD,
coordinator,
validAclOperations(describedGroup.authorizedOperations()));
completed.put(groupId, shareGroupDescription);
failed.put(groupId, error.exception(errorMsg));
break;
default:

View File

@ -110,6 +110,14 @@ public class DescribeGroupsResponse extends AbstractResponse {
DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), AUTHORIZED_OPERATIONS_OMITTED);
}
public static DescribedGroup groupError(String groupId, Errors error, String errorMessage) {
return new DescribedGroup()
.setGroupId(groupId)
.setGroupState(DescribeGroupsResponse.UNKNOWN_STATE)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
}
@Override
public DescribeGroupsResponseData data() {
return data;

View File

@ -25,11 +25,13 @@
// Starting in version 4, the response will include group.instance.id info for members.
//
// Version 5 is the first flexible version.
"validVersions": "0-5",
//
// Version 6 returns error code GROUP_ID_NOT_FOUND if the group ID is not found (KIP-1043).
"validVersions": "0-6",
"flexibleVersions": "5+",
"fields": [
{ "name": "Groups", "type": "[]string", "versions": "0+", "entityType": "groupId",
"about": "The names of the groups to describe" },
"about": "The names of the groups to describe." },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+",
"about": "Whether to include authorized operations." }
]

View File

@ -26,7 +26,9 @@
// Starting in version 4, the response will optionally include group.instance.id info for members.
//
// Version 5 is the first flexible version.
"validVersions": "0-5",
//
// Version 6 returns error code GROUP_ID_NOT_FOUND if the group ID is not found (KIP-1043).
"validVersions": "0-6",
"flexibleVersions": "5+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
@ -35,6 +37,8 @@
"about": "Each described group.", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The describe error, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "6+", "nullableVersions": "6+", "default": "null",
"about": "The describe error message, or null if there was no error." },
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The group ID string." },
{ "name": "GroupState", "type": "string", "versions": "0+",

View File

@ -3799,7 +3799,13 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setCoordinators(asList(
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, GROUP_ID, env.cluster().controller()),
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, "group-connect-0", env.cluster().controller())
))
));
// The first request sent will be a ConsumerGroupDescribe request. Let's
// fail it in order to fail back to using the classic version.
@ -3819,8 +3825,8 @@ public class KafkaAdminClientTest {
byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
memberAssignment.get(memberAssignmentBytes);
DescribeGroupsResponseData group0Data = new DescribeGroupsResponseData();
group0Data.groups().add(DescribeGroupsResponse.groupMetadata(
DescribeGroupsResponseData groupData = new DescribeGroupsResponseData();
groupData.groups().add(DescribeGroupsResponse.groupMetadata(
GROUP_ID,
Errors.NONE,
"",
@ -3831,9 +3837,7 @@ public class KafkaAdminClientTest {
DescribeGroupsResponse.groupMember("1", null, "clientId1", "clientHost", memberAssignmentBytes, null)
),
Collections.emptySet()));
DescribeGroupsResponseData groupConnectData = new DescribeGroupsResponseData();
group0Data.groups().add(DescribeGroupsResponse.groupMetadata(
groupData.groups().add(DescribeGroupsResponse.groupMetadata(
"group-connect-0",
Errors.NONE,
"",
@ -3845,8 +3849,7 @@ public class KafkaAdminClientTest {
),
Collections.emptySet()));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(group0Data));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupConnectData));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupData));
Collection<String> groups = new HashSet<>();
groups.add(GROUP_ID);
@ -3854,6 +3857,72 @@ public class KafkaAdminClientTest {
final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(groups);
assertEquals(2, result.describedGroups().size());
assertEquals(groups, result.describedGroups().keySet());
KafkaFuture<Map<String, ConsumerGroupDescription>> allFuture = result.all();
// This throws because the second group is a classic connect group, not a consumer group.
assertThrows(ExecutionException.class, allFuture::get);
assertTrue(allFuture.isCompletedExceptionally());
}
}
@Test
public void testDescribeConsumerGroupsGroupIdNotFound() {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setCoordinators(asList(
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, GROUP_ID, env.cluster().controller()),
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, "group-connect-0", env.cluster().controller())
))
));
// The first request sent will be a ConsumerGroupDescribe request. Let's
// fail it in order to fail back to using the classic version.
env.kafkaClient().prepareUnsupportedVersionResponse(
request -> request instanceof ConsumerGroupDescribeRequest);
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
final List<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(0, myTopicPartition0);
topicPartitions.add(1, myTopicPartition1);
topicPartitions.add(2, myTopicPartition2);
final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions));
byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
memberAssignment.get(memberAssignmentBytes);
DescribeGroupsResponseData groupData = new DescribeGroupsResponseData();
groupData.groups().add(DescribeGroupsResponse.groupMetadata(
GROUP_ID,
Errors.NONE,
"",
ConsumerProtocol.PROTOCOL_TYPE,
"",
asList(
DescribeGroupsResponse.groupMember("0", null, "clientId0", "clientHost", memberAssignmentBytes, null),
DescribeGroupsResponse.groupMember("1", null, "clientId1", "clientHost", memberAssignmentBytes, null)
),
Collections.emptySet()));
groupData.groups().add(DescribeGroupsResponse.groupError(
"group-connect-0",
Errors.GROUP_ID_NOT_FOUND,
"Group group-connect-0 is not a classic group."));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupData));
Collection<String> groups = new HashSet<>();
groups.add(GROUP_ID);
groups.add("group-connect-0");
final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(groups);
assertEquals(2, result.describedGroups().size());
assertEquals(groups, result.describedGroups().keySet());
KafkaFuture<Map<String, ConsumerGroupDescription>> allFuture = result.all();
assertThrows(ExecutionException.class, allFuture::get);
assertTrue(result.all().isCompletedExceptionally());
}
}
@ -4996,6 +5065,59 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testDescribeShareGroupsGroupIdNotFound() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setCoordinators(asList(
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, GROUP_ID, env.cluster().controller()),
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, "group-1", env.cluster().controller())
))
));
ShareGroupDescribeResponseData.TopicPartitions topicPartitions = new ShareGroupDescribeResponseData.TopicPartitions()
.setTopicName("my_topic")
.setPartitions(asList(0, 1, 2));
final ShareGroupDescribeResponseData.Assignment memberAssignment = new ShareGroupDescribeResponseData.Assignment()
.setTopicPartitions(asList(topicPartitions));
ShareGroupDescribeResponseData groupData = new ShareGroupDescribeResponseData();
groupData.groups().add(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(GROUP_ID)
.setGroupState(GroupState.STABLE.toString())
.setMembers(asList(
new ShareGroupDescribeResponseData.Member()
.setMemberId("0")
.setClientId("clientId0")
.setClientHost("clientHost")
.setAssignment(memberAssignment),
new ShareGroupDescribeResponseData.Member()
.setMemberId("1")
.setClientId("clientId1")
.setClientHost("clientHost")
.setAssignment(memberAssignment))));
groupData.groups().add(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-1")
.setGroupState(GroupState.DEAD.toString())
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage("Group group-1 not found."));
env.kafkaClient().prepareResponse(new ShareGroupDescribeResponse(groupData));
Collection<String> groups = new HashSet<>();
groups.add(GROUP_ID);
groups.add("group-1");
final DescribeShareGroupsResult result = env.adminClient().describeShareGroups(groups);
assertEquals(2, result.describedGroups().size());
assertEquals(groups, result.describedGroups().keySet());
KafkaFuture<Map<String, ShareGroupDescription>> allFuture = result.all();
assertThrows(ExecutionException.class, allFuture::get);
assertTrue(result.all().isCompletedExceptionally());
}
}
@Test
public void testDescribeShareGroupsWithAuthorizedOperationsOmitted() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
@ -5024,15 +5146,21 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(
new FindCoordinatorResponseData()
.setCoordinators(asList(
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, GROUP_ID, env.cluster().controller()),
FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, "group-1", env.cluster().controller())
))
));
ShareGroupDescribeResponseData.TopicPartitions topicPartitions = new ShareGroupDescribeResponseData.TopicPartitions()
.setTopicName("my_topic")
.setPartitions(asList(0, 1, 2));
final ShareGroupDescribeResponseData.Assignment memberAssignment = new ShareGroupDescribeResponseData.Assignment()
.setTopicPartitions(asList(topicPartitions));
ShareGroupDescribeResponseData group0Data = new ShareGroupDescribeResponseData();
group0Data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup()
ShareGroupDescribeResponseData groupData = new ShareGroupDescribeResponseData();
groupData.groups().add(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(GROUP_ID)
.setGroupState(GroupState.STABLE.toString())
.setMembers(asList(
@ -5046,9 +5174,7 @@ public class KafkaAdminClientTest {
.setClientId("clientId1")
.setClientHost("clientHost")
.setAssignment(memberAssignment))));
ShareGroupDescribeResponseData group1Data = new ShareGroupDescribeResponseData();
group1Data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup()
groupData.groups().add(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-1")
.setGroupState(GroupState.STABLE.toString())
.setMembers(asList(
@ -5063,8 +5189,7 @@ public class KafkaAdminClientTest {
.setClientHost("clientHost")
.setAssignment(memberAssignment))));
env.kafkaClient().prepareResponse(new ShareGroupDescribeResponse(group0Data));
env.kafkaClient().prepareResponse(new ShareGroupDescribeResponse(group1Data));
env.kafkaClient().prepareResponse(new ShareGroupDescribeResponse(groupData));
Collection<String> groups = new HashSet<>();
groups.add(GROUP_ID);
@ -5072,6 +5197,9 @@ public class KafkaAdminClientTest {
final DescribeShareGroupsResult result = env.adminClient().describeShareGroups(groups);
assertEquals(2, result.describedGroups().size());
assertEquals(groups, result.describedGroups().keySet());
KafkaFuture<Map<String, ShareGroupDescription>> allFuture = result.all();
assertDoesNotThrow(() -> allFuture.get());
assertFalse(allFuture.isCompletedExceptionally());
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
@ -301,6 +302,7 @@ public class MirrorCheckpointTask extends SourceTask {
// sync offset to the target cluster only if the state of current consumer group is:
// (1) idle: because the consumer at target is not actively consuming the mirrored topic
// (2) dead: the new consumer that is recently created at source and never existed at target
// This case will be reported as a GroupIdNotFoundException
if (consumerGroupState == GroupState.EMPTY) {
idleConsumerGroupsOffset.put(
group,
@ -311,8 +313,13 @@ public class MirrorCheckpointTask extends SourceTask {
);
}
// new consumer upstream has state "DEAD" and will be identified during the offset sync-up
} catch (InterruptedException | ExecutionException e) {
log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e);
} catch (InterruptedException ie) {
log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, ie);
} catch (ExecutionException ee) {
// check for non-existent new consumer upstream which will be identified during the offset sync-up
if (!(ee.getCause() instanceof GroupIdNotFoundException)) {
log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, ee);
}
}
}
}

View File

@ -1138,15 +1138,20 @@ private[group] class GroupCoordinator(
}
}
def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
def handleDescribeGroup(groupId: String, apiVersion: Short): (Errors, Option[String], GroupSummary) = {
validateGroupStatus(groupId, ApiKeys.DESCRIBE_GROUPS) match {
case Some(error) => (error, GroupCoordinator.EmptyGroup)
case Some(error) => (error, None, GroupCoordinator.EmptyGroup)
case None =>
groupManager.getGroup(groupId) match {
case None => (Errors.NONE, GroupCoordinator.DeadGroup)
case None =>
if (apiVersion >= 6) {
(Errors.GROUP_ID_NOT_FOUND, Some(s"Group $groupId not found."), GroupCoordinator.DeadGroup)
} else {
(Errors.NONE, None, GroupCoordinator.DeadGroup)
}
case Some(group) =>
group.inLock {
(Errors.NONE, group.summary)
(Errors.NONE, None, group.summary)
}
}
}

View File

@ -249,10 +249,11 @@ private[group] class GroupCoordinatorAdapter(
): CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]] = {
def describeGroup(groupId: String): DescribeGroupsResponseData.DescribedGroup = {
val (error, summary) = coordinator.handleDescribeGroup(groupId)
val (error, errorMessage, summary) = coordinator.handleDescribeGroup(groupId, context.apiVersion())
new DescribeGroupsResponseData.DescribedGroup()
.setErrorCode(error.code)
.setErrorMessage(errorMessage.orNull)
.setGroupId(groupId)
.setGroupState(summary.state)
.setProtocolType(summary.protocolType)

View File

@ -1610,7 +1610,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)), topicResource)
createAdminClient().describeConsumerGroups(Seq(group).asJava).describedGroups().get(group).get()
val result = createAdminClient().describeConsumerGroups(Seq(group).asJava)
JTestUtils.assertFutureThrows(result.describedGroups().get(group), classOf[GroupIdNotFoundException])
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)

View File

@ -19,6 +19,7 @@ import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, OffsetAndMetadata}
import org.apache.kafka.common.errors.GroupIdNotFoundException
import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, TopicPartition}
import org.junit.jupiter.api.Assertions._
@ -221,7 +222,7 @@ class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
.asScala
.toMap
assertDescribedGroup(groups, "grp3", GroupType.CLASSIC, ConsumerGroupState.DEAD)
assertDescribedDeadGroup(groups, "grp3")
}
}
@ -328,4 +329,18 @@ class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
assertEquals(state, group.state)
assertEquals(Collections.emptyList, group.members)
}
private def assertDescribedDeadGroup(
groups: Map[String, KafkaFuture[ConsumerGroupDescription]],
groupId: String
): Unit = {
try {
groups(groupId).get(10, TimeUnit.SECONDS)
fail(s"Group $groupId should not be found")
} catch {
case e: java.util.concurrent.ExecutionException =>
assertTrue(e.getCause.isInstanceOf[GroupIdNotFoundException])
assertEquals(s"Group $groupId not found.", e.getCause.getMessage)
}
}
}

View File

@ -1936,18 +1936,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP)
assertEquals(expectedOperations, testGroupDescription.authorizedOperations())
// Test that the fake group is listed as dead.
// Test that the fake group throws GroupIdNotFoundException
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
val fakeGroupDescription = describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId), classOf[GroupIdNotFoundException],
s"Group $fakeGroupId not found.")
assertEquals(fakeGroupId, fakeGroupDescription.groupId())
assertEquals(0, fakeGroupDescription.members().size())
assertEquals("", fakeGroupDescription.partitionAssignor())
assertEquals(GroupState.DEAD, fakeGroupDescription.groupState())
assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations())
// Test that all() returns 2 results
assertEquals(2, describeWithFakeGroupResult.all().get().size())
// Test that all() also throws GroupIdNotFoundException
assertFutureThrows(describeWithFakeGroupResult.all(), classOf[GroupIdNotFoundException],
s"Group $fakeGroupId not found.")
val testTopicPart0 = new TopicPartition(testTopicName, 0)
@ -2209,18 +2205,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP)
assertEquals(expectedOperations, testGroupDescription.authorizedOperations())
// Test that the fake group is listed as dead.
// Test that the fake group throws GroupIdNotFoundException
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
val fakeGroupDescription = describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId),
classOf[GroupIdNotFoundException], s"Group $fakeGroupId not found.")
assertEquals(fakeGroupId, fakeGroupDescription.groupId())
assertEquals(0, fakeGroupDescription.members().size())
assertEquals("", fakeGroupDescription.partitionAssignor())
assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state())
assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations())
// Test that all() returns 2 results
assertEquals(2, describeWithFakeGroupResult.all().get().size())
// Test that all() also throws GroupIdNotFoundException
assertFutureThrows(describeWithFakeGroupResult.all(),
classOf[GroupIdNotFoundException], s"Group $fakeGroupId not found.")
val testTopicPart0 = new TopicPartition(testTopicName, 0)
@ -2642,17 +2634,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP)
assertEquals(expectedOperations, testGroupDescription.authorizedOperations())
// Test that the fake group is listed as dead.
// Test that the fake group throws GroupIdNotFoundException
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
val fakeGroupDescription = describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId),
classOf[GroupIdNotFoundException], s"Group $fakeGroupId not found.")
assertEquals(fakeGroupId, fakeGroupDescription.groupId())
assertEquals(0, fakeGroupDescription.members().size())
assertEquals(GroupState.DEAD, fakeGroupDescription.groupState())
assertNull(fakeGroupDescription.authorizedOperations())
// Test that all() returns 2 results
assertEquals(2, describeWithFakeGroupResult.all().get().size())
// Test that all() also throws GroupIdNotFoundException
assertFutureThrows(describeWithFakeGroupResult.all(),
classOf[GroupIdNotFoundException], s"Group $fakeGroupId not found.")
val describeTestGroupResult = client.describeShareGroups(Collections.singleton(testGroupId),
new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
@ -2664,18 +2653,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(testGroupId, testGroupDescription.groupId)
assertEquals(consumerSet.size, testGroupDescription.members().size())
// Describing a share group using describeConsumerGroups reports it as a DEAD consumer group
// in the same way as a non-existent group
// Describing a share group using describeConsumerGroups reports it as a non-existent group
// but the error message is different
val describeConsumerGroupResult = client.describeConsumerGroups(Collections.singleton(testGroupId),
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(1, describeConsumerGroupResult.all().get().size())
val deadConsumerGroupDescription = describeConsumerGroupResult.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, deadConsumerGroupDescription.groupId())
assertEquals(0, deadConsumerGroupDescription.members().size())
assertEquals("", deadConsumerGroupDescription.partitionAssignor())
assertEquals(ConsumerGroupState.DEAD, deadConsumerGroupDescription.state())
assertEquals(expectedOperations, deadConsumerGroupDescription.authorizedOperations())
assertFutureThrows(describeConsumerGroupResult.all(),
classOf[GroupIdNotFoundException], s"Group $testGroupId is not a consumer group.")
} finally {
consumerThreads.foreach {
case consumerThread =>

View File

@ -414,12 +414,12 @@ class GroupCoordinatorAdapterTest {
))
)
when(groupCoordinator.handleDescribeGroup(groupId1)).thenReturn {
(Errors.NONE, groupSummary1)
when(groupCoordinator.handleDescribeGroup(groupId1, ApiKeys.DESCRIBE_GROUPS.latestVersion)).thenReturn {
(Errors.NONE, None, groupSummary1)
}
when(groupCoordinator.handleDescribeGroup(groupId2)).thenReturn {
(Errors.NOT_COORDINATOR, GroupCoordinator.EmptyGroup)
when(groupCoordinator.handleDescribeGroup(groupId2, ApiKeys.DESCRIBE_GROUPS.latestVersion)).thenReturn {
(Errors.NOT_COORDINATOR, None, GroupCoordinator.EmptyGroup)
}
val ctx = makeContext(ApiKeys.DESCRIBE_GROUPS, ApiKeys.DESCRIBE_GROUPS.latestVersion)

View File

@ -175,7 +175,7 @@ class GroupCoordinatorTest {
assertEquals(Some(Errors.NONE), heartbeatError)
// DescribeGroups
val (describeGroupError, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
val (describeGroupError, _, _) = groupCoordinator.handleDescribeGroup(otherGroupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupError)
// ListGroups
@ -187,15 +187,16 @@ class GroupCoordinatorTest {
assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), deleteGroupsErrors.get(otherGroupId))
// Check that non-loading groups are still accessible
assertEquals(Errors.NONE, groupCoordinator.handleDescribeGroup(groupId)._1)
assertEquals(Errors.GROUP_ID_NOT_FOUND, groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)._1)
// After loading, we should be able to access the group
val otherGroupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, otherGroupPartitionId)
when(replicaManager.getLog(otherGroupMetadataTopicPartition)).thenReturn(None)
// Call removeGroupsAndOffsets so that partition removed from loadingPartitions
groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition, OptionalInt.of(1), group => {})
groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition, 1, group => {}, 0L)
assertEquals(Errors.NONE, groupCoordinator.handleDescribeGroup(otherGroupId)._1)
assertEquals(Errors.GROUP_ID_NOT_FOUND, groupCoordinator.handleDescribeGroup(otherGroupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)._1)
}
@Test
@ -2609,8 +2610,9 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, fetchError)
assertEquals(Some(0), partitionData.get(tip.topicPartition).map(_.offset))
val (describeError, summary) = groupCoordinator.handleDescribeGroup(groupId)
var (describeError, describeErrorMessage, summary) = groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)
assertEquals(Errors.NONE, describeError)
assertTrue(describeErrorMessage.isEmpty)
assertEquals(Empty.toString, summary.state)
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
@ -3405,15 +3407,21 @@ class GroupCoordinatorTest {
@Test
def testDescribeGroupWrongCoordinator(): Unit = {
val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
val (error, _, _) = groupCoordinator.handleDescribeGroup(otherGroupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)
assertEquals(Errors.NOT_COORDINATOR, error)
}
@Test
def testDescribeGroupInactiveGroup(): Unit = {
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
val (error, errorMessage, summary) = groupCoordinator.handleDescribeGroup(groupId, 5)
assertEquals(Errors.NONE, error)
assertTrue(errorMessage.isEmpty)
assertEquals(GroupCoordinator.DeadGroup, summary)
val (errorV6, errorMessageV6, summaryV6) = groupCoordinator.handleDescribeGroup(groupId, 6)
assertEquals(Errors.GROUP_ID_NOT_FOUND, errorV6)
assertEquals(s"Group $groupId not found.", errorMessageV6.get)
assertEquals(GroupCoordinator.DeadGroup, summaryV6)
}
@Test
@ -3427,8 +3435,9 @@ class GroupCoordinatorTest {
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, syncGroupResult.error)
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
val (error, errorMessage, summary) = groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)
assertEquals(Errors.NONE, error)
assertTrue(errorMessage.isEmpty)
assertEquals(protocolType, summary.protocolType)
assertEquals("range", summary.protocol)
assertEquals(List(assignedMemberId), summary.members.map(_.memberId))
@ -3445,8 +3454,9 @@ class GroupCoordinatorTest {
val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, syncGroupResult.error)
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
val (error, errorMessage, summary) = groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)
assertEquals(Errors.NONE, error)
assertTrue(errorMessage.isEmpty)
assertEquals(protocolType, summary.protocolType)
assertEquals("range", summary.protocol)
assertEquals(List(assignedMemberId), summary.members.map(_.memberId))
@ -3460,8 +3470,9 @@ class GroupCoordinatorTest {
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
val (error, errorMessage, summary) = groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)
assertEquals(Errors.NONE, error)
assertTrue(errorMessage.isEmpty)
assertEquals(protocolType, summary.protocolType)
assertEquals(GroupCoordinator.NoProtocol, summary.protocol)
assertEquals(CompletingRebalance.toString, summary.state)
@ -3528,9 +3539,9 @@ class GroupCoordinatorTest {
val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, Map(tip -> offset))
assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId)
assertEquals(Stable.toString, describeGroupResult._2.state)
assertEquals(assignedMemberId, describeGroupResult._2.members.head.memberId)
val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)
assertEquals(Stable.toString, describeGroupResult._3.state)
assertEquals(assignedMemberId, describeGroupResult._3.members.head.memberId)
val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId)
verifyLeaveGroupResult(leaveGroupResults)
@ -3545,7 +3556,7 @@ class GroupCoordinatorTest {
val result = groupCoordinator.handleDeleteGroups(Set(groupId))
assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId)._2.state)
assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)._3.state)
}
@Test

View File

@ -123,6 +123,8 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator
List(new DescribedGroup()
.setGroupId("grp")
.setGroupState(ClassicGroupState.DEAD.toString)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code)
.setErrorMessage("Group grp not found.")
),
describeGroups(List("grp"))
)

View File

@ -20,7 +20,7 @@ import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import org.apache.kafka.common.test.api.ClusterTestExtensions
import org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, DescribedGroupMember}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.Assertions.assertEquals
@ -106,6 +106,8 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinat
new DescribedGroup()
.setGroupId("grp-unknown")
.setGroupState(ClassicGroupState.DEAD.toString) // Return DEAD group when the group does not exist.
.setErrorCode(if (version >= 6) Errors.GROUP_ID_NOT_FOUND.code() else Errors.NONE.code())
.setErrorMessage(if (version >= 6) "Group grp-unknown not found." else null)
),
describeGroups(
groupIds = List("grp-1", "grp-2", "grp-unknown"),

View File

@ -3719,7 +3719,8 @@ class KafkaApisTest extends Logging {
val describeGroupsRequest = new DescribeGroupsRequestData().setGroups(List(
"group-1",
"group-2",
"group-3"
"group-3",
"group-4"
).asJava)
val requestChannelRequest = buildRequest(new DescribeGroupsRequest.Builder(describeGroupsRequest).build())
@ -3746,7 +3747,12 @@ class KafkaApisTest extends Logging {
.setErrorCode(Errors.NOT_COORDINATOR.code),
new DescribeGroupsResponseData.DescribedGroup()
.setGroupId("group-3")
.setErrorCode(Errors.REQUEST_TIMED_OUT.code)
.setErrorCode(Errors.REQUEST_TIMED_OUT.code),
new DescribeGroupsResponseData.DescribedGroup()
.setGroupId("group-4")
.setGroupState("Dead")
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code)
.setErrorMessage("Group group-4 is not a classic group.")
).asJava
future.complete(groupResults)

View File

@ -161,12 +161,19 @@
</li>
</ul>
</li>
<li><b>Admin</b>
<li><b>Admin client</b>
<ul>
<li>
The <code>alterConfigs</code> method was removed from the <code>org.apache.kafka.clients.admin.Admin</code>
The <code>alterConfigs</code> method was removed from the <code>org.apache.kafka.clients.admin.Admin</code>.
Please use <code>incrementalAlterConfigs</code> instead.
</li>
<li>The <code>org.apache.kafka.common.ConsumerGroupState</code> enumeration and related methods have been deprecated. Please use <code>GroupState</code> instead
which applies to all types of group.
</li>
<li>The <code>Admin.describeConsumerGroups</code> method used to return a <code>ConsumerGroupDescription</code> in state
<code>DEAD</code> if the group ID was not found. In Apache Kafka 4.0, the <code>GroupIdNotFoundException</code>
is thrown instead as part of the support for new types of group.
</li>
</ul>
</li>
</ul>
@ -185,10 +192,11 @@
See <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223">KIP-750</a> for more details
</li>
<li>
KafkaLog4jAppender has been remove, users should migrate to the log4j2 appender
KafkaLog4jAppender has been removed, users should migrate to the log4j2 appender
See <a href="https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender">KafkaAppender</a> for more details
</li>
<li>The <code>--delete-config</code> option in the <code>kafka-topics</code> command line tool has been deprecated.
<li>
The <code>--delete-config</code> option in the <code>kafka-topics</code> command line tool has been deprecated.
</li>
<li>
For implementors of RemoteLogMetadataManager (RLMM), a new API `nextSegmentWithTxnIndex` is

View File

@ -603,7 +603,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
List<String> groupIds,
long committedOffset
) {
return groupMetadataManager.describeGroups(groupIds, committedOffset);
return groupMetadataManager.describeGroups(context, groupIds, committedOffset);
}
/**

View File

@ -521,6 +521,7 @@ public class GroupMetadataManager {
describedGroups.add(new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(exception.getMessage())
);
}
});
@ -552,6 +553,7 @@ public class GroupMetadataManager {
describedGroups.add(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(exception.getMessage())
);
}
});
@ -562,12 +564,14 @@ public class GroupMetadataManager {
/**
* Handles a DescribeGroup request.
*
* @param context The request context.
* @param groupIds The IDs of the groups to describe.
* @param committedOffset A specified committed offset corresponding to this shard.
*
* @return A list containing the DescribeGroupsResponseData.DescribedGroup.
*/
public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
RequestContext context,
List<String> groupIds,
long committedOffset
) {
@ -603,11 +607,20 @@ public class GroupMetadataManager {
);
}
} catch (GroupIdNotFoundException exception) {
if (context.header.apiVersion() >= 6) {
describedGroups.add(new DescribeGroupsResponseData.DescribedGroup()
.setGroupId(groupId)
.setGroupState(DEAD.toString())
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage(exception.getMessage())
);
} else {
describedGroups.add(new DescribeGroupsResponseData.DescribedGroup()
.setGroupId(groupId)
.setGroupState(DEAD.toString())
);
}
}
});
return describedGroups;
}
@ -647,7 +660,7 @@ public class GroupMetadataManager {
} else if (createIfNotExists && group.type() == CLASSIC && validateOnlineUpgrade((ClassicGroup) group)) {
return convertToConsumerGroup((ClassicGroup) group, records);
} else {
throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group", groupId));
throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
}
}
}
@ -670,7 +683,7 @@ public class GroupMetadataManager {
if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
} else {
throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group", groupId));
throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
}
}
@ -704,7 +717,7 @@ public class GroupMetadataManager {
Group group = groups.get(groupId);
if (group == null && !createIfNotExists) {
throw new GroupIdNotFoundException(String.format("Consumer group %s not found", groupId));
throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId));
}
if (group == null) {

View File

@ -8699,7 +8699,8 @@ public class GroupMetadataManagerTest {
List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(List.of(groupId));
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code());
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage("Group " + groupId + " not found.");
List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = List.of(
describedGroup
);
@ -8741,7 +8742,8 @@ public class GroupMetadataManagerTest {
List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.consumerGroupDescribe(List.of(consumerGroupId), context.lastCommittedOffset);
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(consumerGroupId)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code());
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage("Group " + consumerGroupId + " not found.");
List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = List.of(
describedGroup
);
@ -8873,6 +8875,13 @@ public class GroupMetadataManagerTest {
context.verifyDescribeGroupsReturnsDeadGroup("group-id");
}
@Test
public void testDescribeGroupsBeforeV6GroupIdNotFoundException() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.verifyDescribeGroupsBeforeV6ReturnsDeadGroup("group-id");
}
@Test
public void testGroupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
@ -14976,6 +14985,7 @@ public class GroupMetadataManagerTest {
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage("Group " + groupId + " is not a consumer group.")
);
List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(List.of(groupId));
@ -15048,6 +15058,7 @@ public class GroupMetadataManagerTest {
new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage("Group " + groupId + " is not a share group.")
);
List<ShareGroupDescribeResponseData.DescribedGroup> actual = context.sendShareGroupDescribe(List.of(groupId));

View File

@ -1270,7 +1270,43 @@ public class GroupMetadataManagerTestContext {
}
public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(List<String> groupIds) {
return groupMetadataManager.describeGroups(groupIds, lastCommittedOffset);
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.DESCRIBE_GROUPS,
ApiKeys.DESCRIBE_GROUPS.latestVersion(),
DEFAULT_CLIENT_ID,
0
),
"1",
DEFAULT_CLIENT_ADDRESS,
KafkaPrincipal.ANONYMOUS,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY,
false
);
return groupMetadataManager.describeGroups(context, groupIds, lastCommittedOffset);
}
public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(List<String> groupIds, short apiVersion) {
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.DESCRIBE_GROUPS,
apiVersion,
DEFAULT_CLIENT_ID,
0
),
"1",
DEFAULT_CLIENT_ADDRESS,
KafkaPrincipal.ANONYMOUS,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY,
false
);
return groupMetadataManager.describeGroups(context, groupIds, lastCommittedOffset);
}
public List<ShareGroupDescribeResponseData.DescribedGroup> sendShareGroupDescribe(List<String> groupIds) {
@ -1390,6 +1426,21 @@ public class GroupMetadataManagerTestContext {
List.of(new DescribeGroupsResponseData.DescribedGroup()
.setGroupId(groupId)
.setGroupState(DEAD.toString())
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
.setErrorMessage("Group " + groupId + " not found.")
),
describedGroups
);
}
public void verifyDescribeGroupsBeforeV6ReturnsDeadGroup(String groupId) {
List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
describeGroups(Collections.singletonList(groupId), (short) 5);
assertEquals(
Collections.singletonList(new DescribeGroupsResponseData.DescribedGroup()
.setGroupId(groupId)
.setGroupState(DEAD.toString())
),
describedGroups
);

View File

@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Time;
@ -1002,7 +1003,9 @@ public class IntegrationTestUtils {
.get(applicationId)
.get();
return groupDescription.members().isEmpty();
} catch (final ExecutionException | InterruptedException e) {
} catch (final ExecutionException e) {
return e.getCause() instanceof GroupIdNotFoundException;
} catch (final InterruptedException e) {
return false;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
@ -170,6 +171,7 @@ public class StreamsResetter {
final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(
Collections.singleton(groupId),
new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
try {
final List<MemberDescription> members =
new ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
if (!members.isEmpty()) {
@ -183,6 +185,12 @@ public class StreamsResetter {
+ " You can use option '--force' to remove active members from the group.");
}
}
} catch (ExecutionException ee) {
// If the group ID is not found, this is not an error case
if (!(ee.getCause() instanceof GroupIdNotFoundException)) {
throw ee;
}
}
}
private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map<Object, Object> consumerConfig,

View File

@ -44,6 +44,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.Utils;
@ -569,6 +570,28 @@ public class ConsumerGroupCommand {
switch (state) {
case "Empty":
case "Dead":
result.put(groupId, resetOffsetsForInactiveGroup(groupId));
break;
default:
printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty());
result.put(groupId, Collections.emptyMap());
}
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
} catch (ExecutionException ee) {
if (ee.getCause() instanceof GroupIdNotFoundException) {
result.put(groupId, resetOffsetsForInactiveGroup(groupId));
} else {
throw new RuntimeException(ee);
}
}
});
return result;
}
private Map<TopicPartition, OffsetAndMetadata> resetOffsetsForInactiveGroup(String groupId) {
try {
Collection<TopicPartition> partitionsToReset = getPartitionsToReset(groupId);
Map<TopicPartition, OffsetAndMetadata> preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset);
@ -582,19 +605,17 @@ public class ConsumerGroupCommand {
).all().get();
}
result.put(groupId, preparedOffsets);
break;
default:
printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty());
result.put(groupId, Collections.emptyMap());
return preparedOffsets;
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof KafkaException) {
throw (KafkaException) cause;
} else {
throw new RuntimeException(cause);
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
return result;
}
Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String groupId, List<String> topics) {
@ -702,7 +723,7 @@ public class ConsumerGroupCommand {
System.out.printf(format,
tp.topic(),
tp.partition() >= 0 ? tp.partition() : "Not Provided",
error != null ? "Error: :" + error.getMessage() : "Successful"
error != null ? "Error: " + error.getMessage() : "Successful"
);
});
System.out.println();
@ -1231,8 +1252,10 @@ public class ConsumerGroupCommand {
try {
f.get();
success.put(g, null);
} catch (ExecutionException | InterruptedException e) {
failed.put(g, e);
} catch (InterruptedException ie) {
failed.put(g, ie);
} catch (ExecutionException e) {
failed.put(g, e.getCause());
}
});

View File

@ -257,7 +257,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
CommandLineUtils.printUsageAndExit(parser, "Option " + resetOffsetsOpt + " only accepts one of " + executeOpt + " and " + dryRunOpt);
if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
System.err.println("WARN: No action will be performed as the --execute option is missing." +
System.err.println("WARN: No action will be performed as the --execute option is missing. " +
"In a future major release, the default behavior of this command will be to prompt the user before " +
"executing the reset rather than doing a dry run. You should add the --dry-run option explicitly " +
"if you are scripting this command and want to keep the current default behavior without prompting.");

View File

@ -19,16 +19,20 @@ package org.apache.kafka.tools.consumer.group;
import kafka.api.AbstractAuthorizerIntegrationTest;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import scala.jdk.javaapi.CollectionConverters;
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.fail;
public class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
@ParameterizedTest
@ -38,8 +42,12 @@ public class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group()};
ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(cgcArgs);
ConsumerGroupCommand.ConsumerGroupService consumerGroupService = new ConsumerGroupCommand.ConsumerGroupService(opts, Collections.emptyMap());
try (ConsumerGroupCommand.ConsumerGroupService consumerGroupService = new ConsumerGroupCommand.ConsumerGroupService(opts, Collections.emptyMap())) {
consumerGroupService.describeGroups();
consumerGroupService.close();
fail("Non-existent group should throw an exception");
} catch (ExecutionException e) {
assertInstanceOf(GroupIdNotFoundException.class, e.getCause(),
"Non-existent group should throw GroupIdNotFoundException");
}
}
}

View File

@ -100,7 +100,7 @@ public class DeleteConsumerGroupsTest {
assertEquals(1, result.size());
assertNotNull(result.get(missingGroupId));
assertInstanceOf(GroupIdNotFoundException.class,
result.get(missingGroupId).getCause(),
result.get(missingGroupId),
"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
}
}
@ -132,7 +132,7 @@ public class DeleteConsumerGroupsTest {
assertEquals(1, result.size());
assertNotNull(result.get(groupId));
assertInstanceOf(GroupNotEmptyException.class,
result.get(groupId).getCause(),
result.get(groupId),
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Result was:(" + result + ")");
}
}

View File

@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.api.ClusterConfig;
@ -68,6 +69,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ExtendWith(value = ClusterTestExtensions.class)
public class DescribeConsumerGroupTest {
@ -92,9 +94,13 @@ public class DescribeConsumerGroupTest {
List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup));
cgcArgs.addAll(describeType);
try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0]))) {
String output = ToolsTestUtils.grabConsoleOutput(describeGroups(service));
assertTrue(output.contains("Consumer group '" + missingGroup + "' does not exist."),
"Expected error was not detected for describe option '" + String.join(" ", describeType) + "'");
service.describeGroups();
fail("Expected error was not detected for describe option '" + String.join(" ", describeType) + "'");
} catch (ExecutionException ee) {
assertInstanceOf(GroupIdNotFoundException.class, ee.getCause());
assertEquals("Group " + missingGroup + " not found.", ee.getCause().getMessage());
} catch (Exception e) {
fail("Expected error was not detected for describe option '" + String.join(" ", describeType) + "'");
}
}
}
@ -113,9 +119,11 @@ public class DescribeConsumerGroupTest {
// note the group to be queried is a different (non-existing) group
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup})
) {
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(missingGroup);
assertTrue(res.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'.");
service.collectGroupOffsets(missingGroup);
fail("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException");
} catch (ExecutionException ee) {
assertInstanceOf(GroupIdNotFoundException.class, ee.getCause(),
"Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException");
}
}
}
@ -132,13 +140,11 @@ public class DescribeConsumerGroupTest {
// note the group to be queried is a different (non-existing) group
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup})
) {
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(missingGroup, false);
assertTrue(res.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'.");
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(missingGroup, true);
assertTrue(res2.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + missingGroup + "' (verbose option).");
service.collectGroupMembers(missingGroup, false);
fail("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException");
} catch (ExecutionException ee) {
assertInstanceOf(GroupIdNotFoundException.class, ee.getCause(),
"Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException");
}
}
}
@ -155,11 +161,11 @@ public class DescribeConsumerGroupTest {
// note the group to be queried is a different (non-existing) group
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup})
) {
GroupInformation state = service.collectGroupState(missingGroup);
assertTrue(Objects.equals(state.groupState, GroupState.DEAD) && state.numMembers == 0 &&
state.coordinator != null && clusterInstance.brokerIds().contains(state.coordinator.id()),
"Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'."
);
service.collectGroupState(missingGroup);
fail("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException");
} catch (ExecutionException ee) {
assertInstanceOf(GroupIdNotFoundException.class, ee.getCause(),
"Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException");
}
}
}

View File

@ -136,10 +136,6 @@ public class ResetConsumerGroupOffsetTest {
String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute");
try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) {
// Make sure we got a coordinator
TestUtils.waitForCondition(
() -> "localhost".equals(service.collectGroupState(group).coordinator.host()),
"Can't find a coordinator");
Map<TopicPartition, OffsetAndMetadata> resetOffsets = service.resetOffsets().get(group);
assertTrue(resetOffsets.isEmpty());
assertTrue(committedOffsets(cluster, topic, group).isEmpty());