diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index e66d45e2635..1d911e2f0c7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -64,6 +64,7 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler lookupStrategy; private final Set useClassicGroupApi; + private final Map groupIdNotFoundErrorMessages; public DescribeConsumerGroupsHandler( boolean includeAuthorizedOperations, @@ -73,6 +74,7 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler(); + this.groupIdNotFoundErrorMessages = new HashMap<>(); } private static Set buildKeySet(Collection groupIds) { @@ -255,7 +257,7 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler groups = new HashSet<>(); groups.add(GROUP_ID); @@ -3854,6 +3857,72 @@ public class KafkaAdminClientTest { final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(groups); assertEquals(2, result.describedGroups().size()); assertEquals(groups, result.describedGroups().keySet()); + KafkaFuture> allFuture = result.all(); + // This throws because the second group is a classic connect group, not a consumer group. + assertThrows(ExecutionException.class, allFuture::get); + assertTrue(allFuture.isCompletedExceptionally()); + } + } + + @Test + public void testDescribeConsumerGroupsGroupIdNotFound() { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(asList( + FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, GROUP_ID, env.cluster().controller()), + FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, "group-connect-0", env.cluster().controller()) + )) + )); + + // The first request sent will be a ConsumerGroupDescribe request. Let's + // fail it in order to fail back to using the classic version. + env.kafkaClient().prepareUnsupportedVersionResponse( + request -> request instanceof ConsumerGroupDescribeRequest); + + TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); + TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); + TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2); + + final List topicPartitions = new ArrayList<>(); + topicPartitions.add(0, myTopicPartition0); + topicPartitions.add(1, myTopicPartition1); + topicPartitions.add(2, myTopicPartition2); + + final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions)); + byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; + memberAssignment.get(memberAssignmentBytes); + + DescribeGroupsResponseData groupData = new DescribeGroupsResponseData(); + groupData.groups().add(DescribeGroupsResponse.groupMetadata( + GROUP_ID, + Errors.NONE, + "", + ConsumerProtocol.PROTOCOL_TYPE, + "", + asList( + DescribeGroupsResponse.groupMember("0", null, "clientId0", "clientHost", memberAssignmentBytes, null), + DescribeGroupsResponse.groupMember("1", null, "clientId1", "clientHost", memberAssignmentBytes, null) + ), + Collections.emptySet())); + groupData.groups().add(DescribeGroupsResponse.groupError( + "group-connect-0", + Errors.GROUP_ID_NOT_FOUND, + "Group group-connect-0 is not a classic group.")); + + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupData)); + + Collection groups = new HashSet<>(); + groups.add(GROUP_ID); + groups.add("group-connect-0"); + final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(groups); + assertEquals(2, result.describedGroups().size()); + assertEquals(groups, result.describedGroups().keySet()); + KafkaFuture> allFuture = result.all(); + assertThrows(ExecutionException.class, allFuture::get); + assertTrue(result.all().isCompletedExceptionally()); } } @@ -4996,6 +5065,59 @@ public class KafkaAdminClientTest { } } + @Test + public void testDescribeShareGroupsGroupIdNotFound() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(asList( + FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, GROUP_ID, env.cluster().controller()), + FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, "group-1", env.cluster().controller()) + )) + )); + + ShareGroupDescribeResponseData.TopicPartitions topicPartitions = new ShareGroupDescribeResponseData.TopicPartitions() + .setTopicName("my_topic") + .setPartitions(asList(0, 1, 2)); + final ShareGroupDescribeResponseData.Assignment memberAssignment = new ShareGroupDescribeResponseData.Assignment() + .setTopicPartitions(asList(topicPartitions)); + ShareGroupDescribeResponseData groupData = new ShareGroupDescribeResponseData(); + groupData.groups().add(new ShareGroupDescribeResponseData.DescribedGroup() + .setGroupId(GROUP_ID) + .setGroupState(GroupState.STABLE.toString()) + .setMembers(asList( + new ShareGroupDescribeResponseData.Member() + .setMemberId("0") + .setClientId("clientId0") + .setClientHost("clientHost") + .setAssignment(memberAssignment), + new ShareGroupDescribeResponseData.Member() + .setMemberId("1") + .setClientId("clientId1") + .setClientHost("clientHost") + .setAssignment(memberAssignment)))); + groupData.groups().add(new ShareGroupDescribeResponseData.DescribedGroup() + .setGroupId("group-1") + .setGroupState(GroupState.DEAD.toString()) + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + .setErrorMessage("Group group-1 not found.")); + + env.kafkaClient().prepareResponse(new ShareGroupDescribeResponse(groupData)); + + Collection groups = new HashSet<>(); + groups.add(GROUP_ID); + groups.add("group-1"); + final DescribeShareGroupsResult result = env.adminClient().describeShareGroups(groups); + assertEquals(2, result.describedGroups().size()); + assertEquals(groups, result.describedGroups().keySet()); + KafkaFuture> allFuture = result.all(); + assertThrows(ExecutionException.class, allFuture::get); + assertTrue(result.all().isCompletedExceptionally()); + } + } + @Test public void testDescribeShareGroupsWithAuthorizedOperationsOmitted() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { @@ -5024,15 +5146,21 @@ public class KafkaAdminClientTest { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setCoordinators(asList( + FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, GROUP_ID, env.cluster().controller()), + FindCoordinatorResponse.prepareCoordinatorResponse(Errors.NONE, "group-1", env.cluster().controller()) + )) + )); ShareGroupDescribeResponseData.TopicPartitions topicPartitions = new ShareGroupDescribeResponseData.TopicPartitions() .setTopicName("my_topic") .setPartitions(asList(0, 1, 2)); final ShareGroupDescribeResponseData.Assignment memberAssignment = new ShareGroupDescribeResponseData.Assignment() .setTopicPartitions(asList(topicPartitions)); - ShareGroupDescribeResponseData group0Data = new ShareGroupDescribeResponseData(); - group0Data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup() + ShareGroupDescribeResponseData groupData = new ShareGroupDescribeResponseData(); + groupData.groups().add(new ShareGroupDescribeResponseData.DescribedGroup() .setGroupId(GROUP_ID) .setGroupState(GroupState.STABLE.toString()) .setMembers(asList( @@ -5046,9 +5174,7 @@ public class KafkaAdminClientTest { .setClientId("clientId1") .setClientHost("clientHost") .setAssignment(memberAssignment)))); - - ShareGroupDescribeResponseData group1Data = new ShareGroupDescribeResponseData(); - group1Data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup() + groupData.groups().add(new ShareGroupDescribeResponseData.DescribedGroup() .setGroupId("group-1") .setGroupState(GroupState.STABLE.toString()) .setMembers(asList( @@ -5063,8 +5189,7 @@ public class KafkaAdminClientTest { .setClientHost("clientHost") .setAssignment(memberAssignment)))); - env.kafkaClient().prepareResponse(new ShareGroupDescribeResponse(group0Data)); - env.kafkaClient().prepareResponse(new ShareGroupDescribeResponse(group1Data)); + env.kafkaClient().prepareResponse(new ShareGroupDescribeResponse(groupData)); Collection groups = new HashSet<>(); groups.add(GROUP_ID); @@ -5072,6 +5197,9 @@ public class KafkaAdminClientTest { final DescribeShareGroupsResult result = env.adminClient().describeShareGroups(groups); assertEquals(2, result.describedGroups().size()); assertEquals(groups, result.describedGroups().keySet()); + KafkaFuture> allFuture = result.all(); + assertDoesNotThrow(() -> allFuture.get()); + assertFalse(allFuture.isCompletedExceptionally()); } } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index bc49c3a09b7..6ce91baf123 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Schema; @@ -301,6 +302,7 @@ public class MirrorCheckpointTask extends SourceTask { // sync offset to the target cluster only if the state of current consumer group is: // (1) idle: because the consumer at target is not actively consuming the mirrored topic // (2) dead: the new consumer that is recently created at source and never existed at target + // This case will be reported as a GroupIdNotFoundException if (consumerGroupState == GroupState.EMPTY) { idleConsumerGroupsOffset.put( group, @@ -311,8 +313,13 @@ public class MirrorCheckpointTask extends SourceTask { ); } // new consumer upstream has state "DEAD" and will be identified during the offset sync-up - } catch (InterruptedException | ExecutionException e) { - log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e); + } catch (InterruptedException ie) { + log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, ie); + } catch (ExecutionException ee) { + // check for non-existent new consumer upstream which will be identified during the offset sync-up + if (!(ee.getCause() instanceof GroupIdNotFoundException)) { + log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, ee); + } } } } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 8ca956daedf..47684bc753b 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -1138,15 +1138,20 @@ private[group] class GroupCoordinator( } } - def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = { + def handleDescribeGroup(groupId: String, apiVersion: Short): (Errors, Option[String], GroupSummary) = { validateGroupStatus(groupId, ApiKeys.DESCRIBE_GROUPS) match { - case Some(error) => (error, GroupCoordinator.EmptyGroup) + case Some(error) => (error, None, GroupCoordinator.EmptyGroup) case None => groupManager.getGroup(groupId) match { - case None => (Errors.NONE, GroupCoordinator.DeadGroup) + case None => + if (apiVersion >= 6) { + (Errors.GROUP_ID_NOT_FOUND, Some(s"Group $groupId not found."), GroupCoordinator.DeadGroup) + } else { + (Errors.NONE, None, GroupCoordinator.DeadGroup) + } case Some(group) => group.inLock { - (Errors.NONE, group.summary) + (Errors.NONE, None, group.summary) } } } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 970d283953e..d8d47c46116 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -249,10 +249,11 @@ private[group] class GroupCoordinatorAdapter( ): CompletableFuture[util.List[DescribeGroupsResponseData.DescribedGroup]] = { def describeGroup(groupId: String): DescribeGroupsResponseData.DescribedGroup = { - val (error, summary) = coordinator.handleDescribeGroup(groupId) + val (error, errorMessage, summary) = coordinator.handleDescribeGroup(groupId, context.apiVersion()) new DescribeGroupsResponseData.DescribedGroup() .setErrorCode(error.code) + .setErrorMessage(errorMessage.orNull) .setGroupId(groupId) .setGroupState(summary.state) .setProtocolType(summary.protocolType) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 016cfcad122..5ae424f2137 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1610,7 +1610,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { createTopicWithBrokerPrincipal(topic) addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)), groupResource) addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)), topicResource) - createAdminClient().describeConsumerGroups(Seq(group).asJava).describedGroups().get(group).get() + val result = createAdminClient().describeConsumerGroups(Seq(group).asJava) + JTestUtils.assertFutureThrows(result.describedGroups().get(group), classOf[GroupIdNotFoundException]) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala index 04146e12685..84214a79ed9 100644 --- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala @@ -19,6 +19,7 @@ import org.apache.kafka.common.test.api.ClusterTestExtensions import kafka.utils.TestUtils import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription} import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, OffsetAndMetadata} +import org.apache.kafka.common.errors.GroupIdNotFoundException import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, TopicPartition} import org.junit.jupiter.api.Assertions._ @@ -221,7 +222,7 @@ class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) { .asScala .toMap - assertDescribedGroup(groups, "grp3", GroupType.CLASSIC, ConsumerGroupState.DEAD) + assertDescribedDeadGroup(groups, "grp3") } } @@ -328,4 +329,18 @@ class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) { assertEquals(state, group.state) assertEquals(Collections.emptyList, group.members) } + + private def assertDescribedDeadGroup( + groups: Map[String, KafkaFuture[ConsumerGroupDescription]], + groupId: String + ): Unit = { + try { + groups(groupId).get(10, TimeUnit.SECONDS) + fail(s"Group $groupId should not be found") + } catch { + case e: java.util.concurrent.ExecutionException => + assertTrue(e.getCause.isInstanceOf[GroupIdNotFoundException]) + assertEquals(s"Group $groupId not found.", e.getCause.getMessage) + } + } } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index b4d06d9f993..c53dfef850e 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1936,18 +1936,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP) assertEquals(expectedOperations, testGroupDescription.authorizedOperations()) - // Test that the fake group is listed as dead. + // Test that the fake group throws GroupIdNotFoundException assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId)) - val fakeGroupDescription = describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get() + assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId), classOf[GroupIdNotFoundException], + s"Group $fakeGroupId not found.") - assertEquals(fakeGroupId, fakeGroupDescription.groupId()) - assertEquals(0, fakeGroupDescription.members().size()) - assertEquals("", fakeGroupDescription.partitionAssignor()) - assertEquals(GroupState.DEAD, fakeGroupDescription.groupState()) - assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations()) - - // Test that all() returns 2 results - assertEquals(2, describeWithFakeGroupResult.all().get().size()) + // Test that all() also throws GroupIdNotFoundException + assertFutureThrows(describeWithFakeGroupResult.all(), classOf[GroupIdNotFoundException], + s"Group $fakeGroupId not found.") val testTopicPart0 = new TopicPartition(testTopicName, 0) @@ -2209,18 +2205,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP) assertEquals(expectedOperations, testGroupDescription.authorizedOperations()) - // Test that the fake group is listed as dead. + // Test that the fake group throws GroupIdNotFoundException assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId)) - val fakeGroupDescription = describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get() + assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId), + classOf[GroupIdNotFoundException], s"Group $fakeGroupId not found.") - assertEquals(fakeGroupId, fakeGroupDescription.groupId()) - assertEquals(0, fakeGroupDescription.members().size()) - assertEquals("", fakeGroupDescription.partitionAssignor()) - assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state()) - assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations()) - - // Test that all() returns 2 results - assertEquals(2, describeWithFakeGroupResult.all().get().size()) + // Test that all() also throws GroupIdNotFoundException + assertFutureThrows(describeWithFakeGroupResult.all(), + classOf[GroupIdNotFoundException], s"Group $fakeGroupId not found.") val testTopicPart0 = new TopicPartition(testTopicName, 0) @@ -2642,17 +2634,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP) assertEquals(expectedOperations, testGroupDescription.authorizedOperations()) - // Test that the fake group is listed as dead. + // Test that the fake group throws GroupIdNotFoundException assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId)) - val fakeGroupDescription = describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get() + assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId), + classOf[GroupIdNotFoundException], s"Group $fakeGroupId not found.") - assertEquals(fakeGroupId, fakeGroupDescription.groupId()) - assertEquals(0, fakeGroupDescription.members().size()) - assertEquals(GroupState.DEAD, fakeGroupDescription.groupState()) - assertNull(fakeGroupDescription.authorizedOperations()) - - // Test that all() returns 2 results - assertEquals(2, describeWithFakeGroupResult.all().get().size()) + // Test that all() also throws GroupIdNotFoundException + assertFutureThrows(describeWithFakeGroupResult.all(), + classOf[GroupIdNotFoundException], s"Group $fakeGroupId not found.") val describeTestGroupResult = client.describeShareGroups(Collections.singleton(testGroupId), new DescribeShareGroupsOptions().includeAuthorizedOperations(true)) @@ -2664,18 +2653,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(testGroupId, testGroupDescription.groupId) assertEquals(consumerSet.size, testGroupDescription.members().size()) - // Describing a share group using describeConsumerGroups reports it as a DEAD consumer group - // in the same way as a non-existent group + // Describing a share group using describeConsumerGroups reports it as a non-existent group + // but the error message is different val describeConsumerGroupResult = client.describeConsumerGroups(Collections.singleton(testGroupId), new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) - assertEquals(1, describeConsumerGroupResult.all().get().size()) - - val deadConsumerGroupDescription = describeConsumerGroupResult.describedGroups().get(testGroupId).get() - assertEquals(testGroupId, deadConsumerGroupDescription.groupId()) - assertEquals(0, deadConsumerGroupDescription.members().size()) - assertEquals("", deadConsumerGroupDescription.partitionAssignor()) - assertEquals(ConsumerGroupState.DEAD, deadConsumerGroupDescription.state()) - assertEquals(expectedOperations, deadConsumerGroupDescription.authorizedOperations()) + assertFutureThrows(describeConsumerGroupResult.all(), + classOf[GroupIdNotFoundException], s"Group $testGroupId is not a consumer group.") } finally { consumerThreads.foreach { case consumerThread => diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 7a9de453740..8d631fe2b80 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -414,12 +414,12 @@ class GroupCoordinatorAdapterTest { )) ) - when(groupCoordinator.handleDescribeGroup(groupId1)).thenReturn { - (Errors.NONE, groupSummary1) + when(groupCoordinator.handleDescribeGroup(groupId1, ApiKeys.DESCRIBE_GROUPS.latestVersion)).thenReturn { + (Errors.NONE, None, groupSummary1) } - when(groupCoordinator.handleDescribeGroup(groupId2)).thenReturn { - (Errors.NOT_COORDINATOR, GroupCoordinator.EmptyGroup) + when(groupCoordinator.handleDescribeGroup(groupId2, ApiKeys.DESCRIBE_GROUPS.latestVersion)).thenReturn { + (Errors.NOT_COORDINATOR, None, GroupCoordinator.EmptyGroup) } val ctx = makeContext(ApiKeys.DESCRIBE_GROUPS, ApiKeys.DESCRIBE_GROUPS.latestVersion) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index df0e3483f5a..433047275e2 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -175,7 +175,7 @@ class GroupCoordinatorTest { assertEquals(Some(Errors.NONE), heartbeatError) // DescribeGroups - val (describeGroupError, _) = groupCoordinator.handleDescribeGroup(otherGroupId) + val (describeGroupError, _, _) = groupCoordinator.handleDescribeGroup(otherGroupId, ApiKeys.DESCRIBE_GROUPS.latestVersion) assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupError) // ListGroups @@ -187,15 +187,16 @@ class GroupCoordinatorTest { assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), deleteGroupsErrors.get(otherGroupId)) // Check that non-loading groups are still accessible - assertEquals(Errors.NONE, groupCoordinator.handleDescribeGroup(groupId)._1) + assertEquals(Errors.GROUP_ID_NOT_FOUND, groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)._1) // After loading, we should be able to access the group val otherGroupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, otherGroupPartitionId) when(replicaManager.getLog(otherGroupMetadataTopicPartition)).thenReturn(None) + // Call removeGroupsAndOffsets so that partition removed from loadingPartitions groupCoordinator.groupManager.removeGroupsAndOffsets(otherGroupMetadataTopicPartition, OptionalInt.of(1), group => {}) groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition, 1, group => {}, 0L) - assertEquals(Errors.NONE, groupCoordinator.handleDescribeGroup(otherGroupId)._1) + assertEquals(Errors.GROUP_ID_NOT_FOUND, groupCoordinator.handleDescribeGroup(otherGroupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)._1) } @Test @@ -2609,8 +2610,9 @@ class GroupCoordinatorTest { assertEquals(Errors.NONE, fetchError) assertEquals(Some(0), partitionData.get(tip.topicPartition).map(_.offset)) - val (describeError, summary) = groupCoordinator.handleDescribeGroup(groupId) + var (describeError, describeErrorMessage, summary) = groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion) assertEquals(Errors.NONE, describeError) + assertTrue(describeErrorMessage.isEmpty) assertEquals(Empty.toString, summary.state) val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) @@ -3405,15 +3407,21 @@ class GroupCoordinatorTest { @Test def testDescribeGroupWrongCoordinator(): Unit = { - val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId) + val (error, _, _) = groupCoordinator.handleDescribeGroup(otherGroupId, ApiKeys.DESCRIBE_GROUPS.latestVersion) assertEquals(Errors.NOT_COORDINATOR, error) } @Test def testDescribeGroupInactiveGroup(): Unit = { - val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + val (error, errorMessage, summary) = groupCoordinator.handleDescribeGroup(groupId, 5) assertEquals(Errors.NONE, error) + assertTrue(errorMessage.isEmpty) assertEquals(GroupCoordinator.DeadGroup, summary) + + val (errorV6, errorMessageV6, summaryV6) = groupCoordinator.handleDescribeGroup(groupId, 6) + assertEquals(Errors.GROUP_ID_NOT_FOUND, errorV6) + assertEquals(s"Group $groupId not found.", errorMessageV6.get) + assertEquals(GroupCoordinator.DeadGroup, summaryV6) } @Test @@ -3427,8 +3435,9 @@ class GroupCoordinatorTest { val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) assertEquals(Errors.NONE, syncGroupResult.error) - val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + val (error, errorMessage, summary) = groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion) assertEquals(Errors.NONE, error) + assertTrue(errorMessage.isEmpty) assertEquals(protocolType, summary.protocolType) assertEquals("range", summary.protocol) assertEquals(List(assignedMemberId), summary.members.map(_.memberId)) @@ -3445,8 +3454,9 @@ class GroupCoordinatorTest { val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) assertEquals(Errors.NONE, syncGroupResult.error) - val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + val (error, errorMessage, summary) = groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion) assertEquals(Errors.NONE, error) + assertTrue(errorMessage.isEmpty) assertEquals(protocolType, summary.protocolType) assertEquals("range", summary.protocol) assertEquals(List(assignedMemberId), summary.members.map(_.memberId)) @@ -3460,8 +3470,9 @@ class GroupCoordinatorTest { val joinGroupError = joinGroupResult.error assertEquals(Errors.NONE, joinGroupError) - val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + val (error, errorMessage, summary) = groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion) assertEquals(Errors.NONE, error) + assertTrue(errorMessage.isEmpty) assertEquals(protocolType, summary.protocolType) assertEquals(GroupCoordinator.NoProtocol, summary.protocol) assertEquals(CompletingRebalance.toString, summary.state) @@ -3528,9 +3539,9 @@ class GroupCoordinatorTest { val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, Map(tip -> offset)) assertEquals(Map(tip -> Errors.NONE), commitOffsetResult) - val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId) - assertEquals(Stable.toString, describeGroupResult._2.state) - assertEquals(assignedMemberId, describeGroupResult._2.members.head.memberId) + val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion) + assertEquals(Stable.toString, describeGroupResult._3.state) + assertEquals(assignedMemberId, describeGroupResult._3.members.head.memberId) val leaveGroupResults = singleLeaveGroup(groupId, assignedMemberId) verifyLeaveGroupResult(leaveGroupResults) @@ -3545,7 +3556,7 @@ class GroupCoordinatorTest { val result = groupCoordinator.handleDeleteGroups(Set(groupId)) assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE)) - assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId)._2.state) + assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId, ApiKeys.DESCRIBE_GROUPS.latestVersion)._3.state) } @Test diff --git a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala index 2485f09409d..0fab872363b 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala @@ -123,6 +123,8 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinator List(new DescribedGroup() .setGroupId("grp") .setGroupState(ClassicGroupState.DEAD.toString) + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code) + .setErrorMessage("Group grp not found.") ), describeGroups(List("grp")) ) diff --git a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala index 4a822048e1c..de8044ce2c1 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala @@ -20,7 +20,7 @@ import org.apache.kafka.common.test.api.ClusterInstance import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} import org.apache.kafka.common.test.api.ClusterTestExtensions import org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, DescribedGroupMember} -import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.classic.ClassicGroupState import org.junit.jupiter.api.Assertions.assertEquals @@ -106,6 +106,8 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinat new DescribedGroup() .setGroupId("grp-unknown") .setGroupState(ClassicGroupState.DEAD.toString) // Return DEAD group when the group does not exist. + .setErrorCode(if (version >= 6) Errors.GROUP_ID_NOT_FOUND.code() else Errors.NONE.code()) + .setErrorMessage(if (version >= 6) "Group grp-unknown not found." else null) ), describeGroups( groupIds = List("grp-1", "grp-2", "grp-unknown"), diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 5771a17d5ea..f856caf9cfe 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -3719,7 +3719,8 @@ class KafkaApisTest extends Logging { val describeGroupsRequest = new DescribeGroupsRequestData().setGroups(List( "group-1", "group-2", - "group-3" + "group-3", + "group-4" ).asJava) val requestChannelRequest = buildRequest(new DescribeGroupsRequest.Builder(describeGroupsRequest).build()) @@ -3746,7 +3747,12 @@ class KafkaApisTest extends Logging { .setErrorCode(Errors.NOT_COORDINATOR.code), new DescribeGroupsResponseData.DescribedGroup() .setGroupId("group-3") - .setErrorCode(Errors.REQUEST_TIMED_OUT.code) + .setErrorCode(Errors.REQUEST_TIMED_OUT.code), + new DescribeGroupsResponseData.DescribedGroup() + .setGroupId("group-4") + .setGroupState("Dead") + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code) + .setErrorMessage("Group group-4 is not a classic group.") ).asJava future.complete(groupResults) diff --git a/docs/upgrade.html b/docs/upgrade.html index 8e2aa2915f4..84b2b36ebc1 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -161,12 +161,19 @@ -
  • Admin +
  • Admin client
    • - The alterConfigs method was removed from the org.apache.kafka.clients.admin.Admin + The alterConfigs method was removed from the org.apache.kafka.clients.admin.Admin. Please use incrementalAlterConfigs instead.
    • +
    • The org.apache.kafka.common.ConsumerGroupState enumeration and related methods have been deprecated. Please use GroupState instead + which applies to all types of group. +
    • +
    • The Admin.describeConsumerGroups method used to return a ConsumerGroupDescription in state + DEAD if the group ID was not found. In Apache Kafka 4.0, the GroupIdNotFoundException + is thrown instead as part of the support for new types of group. +
  • @@ -185,10 +192,11 @@ See KIP-750 for more details
  • - KafkaLog4jAppender has been remove, users should migrate to the log4j2 appender + KafkaLog4jAppender has been removed, users should migrate to the log4j2 appender See KafkaAppender for more details
  • -
  • The --delete-config option in the kafka-topics command line tool has been deprecated. +
  • + The --delete-config option in the kafka-topics command line tool has been deprecated.
  • For implementors of RemoteLogMetadataManager (RLMM), a new API `nextSegmentWithTxnIndex` is diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 72a4ae58553..a2ead8effa0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -603,7 +603,7 @@ public class GroupCoordinatorShard implements CoordinatorShard groupIds, long committedOffset ) { - return groupMetadataManager.describeGroups(groupIds, committedOffset); + return groupMetadataManager.describeGroups(context, groupIds, committedOffset); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 69e7955d0bf..dda3a41d0bf 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -521,6 +521,7 @@ public class GroupMetadataManager { describedGroups.add(new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupId(groupId) .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + .setErrorMessage(exception.getMessage()) ); } }); @@ -552,6 +553,7 @@ public class GroupMetadataManager { describedGroups.add(new ShareGroupDescribeResponseData.DescribedGroup() .setGroupId(groupId) .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + .setErrorMessage(exception.getMessage()) ); } }); @@ -562,12 +564,14 @@ public class GroupMetadataManager { /** * Handles a DescribeGroup request. * + * @param context The request context. * @param groupIds The IDs of the groups to describe. * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the DescribeGroupsResponseData.DescribedGroup. */ public List describeGroups( + RequestContext context, List groupIds, long committedOffset ) { @@ -603,10 +607,19 @@ public class GroupMetadataManager { ); } } catch (GroupIdNotFoundException exception) { - describedGroups.add(new DescribeGroupsResponseData.DescribedGroup() - .setGroupId(groupId) - .setGroupState(DEAD.toString()) - ); + if (context.header.apiVersion() >= 6) { + describedGroups.add(new DescribeGroupsResponseData.DescribedGroup() + .setGroupId(groupId) + .setGroupState(DEAD.toString()) + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + .setErrorMessage(exception.getMessage()) + ); + } else { + describedGroups.add(new DescribeGroupsResponseData.DescribedGroup() + .setGroupId(groupId) + .setGroupState(DEAD.toString()) + ); + } } }); return describedGroups; @@ -647,7 +660,7 @@ public class GroupMetadataManager { } else if (createIfNotExists && group.type() == CLASSIC && validateOnlineUpgrade((ClassicGroup) group)) { return convertToConsumerGroup((ClassicGroup) group, records); } else { - throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group", groupId)); + throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId)); } } } @@ -670,7 +683,7 @@ public class GroupMetadataManager { if (group.type() == CONSUMER) { return (ConsumerGroup) group; } else { - throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group", groupId)); + throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId)); } } @@ -704,7 +717,7 @@ public class GroupMetadataManager { Group group = groups.get(groupId); if (group == null && !createIfNotExists) { - throw new GroupIdNotFoundException(String.format("Consumer group %s not found", groupId)); + throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId)); } if (group == null) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index f6391b5bd61..97ba6a5873a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -8699,7 +8699,8 @@ public class GroupMetadataManagerTest { List actual = context.sendConsumerGroupDescribe(List.of(groupId)); ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupId(groupId) - .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()); + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + .setErrorMessage("Group " + groupId + " not found."); List expected = List.of( describedGroup ); @@ -8741,7 +8742,8 @@ public class GroupMetadataManagerTest { List actual = context.groupMetadataManager.consumerGroupDescribe(List.of(consumerGroupId), context.lastCommittedOffset); ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupId(consumerGroupId) - .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()); + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + .setErrorMessage("Group " + consumerGroupId + " not found."); List expected = List.of( describedGroup ); @@ -8873,6 +8875,13 @@ public class GroupMetadataManagerTest { context.verifyDescribeGroupsReturnsDeadGroup("group-id"); } + @Test + public void testDescribeGroupsBeforeV6GroupIdNotFoundException() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.verifyDescribeGroupsBeforeV6ReturnsDeadGroup("group-id"); + } + @Test public void testGroupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() throws Exception { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -14976,6 +14985,7 @@ public class GroupMetadataManagerTest { new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupId(groupId) .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + .setErrorMessage("Group " + groupId + " is not a consumer group.") ); List actual = context.sendConsumerGroupDescribe(List.of(groupId)); @@ -15048,6 +15058,7 @@ public class GroupMetadataManagerTest { new ShareGroupDescribeResponseData.DescribedGroup() .setGroupId(groupId) .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + .setErrorMessage("Group " + groupId + " is not a share group.") ); List actual = context.sendShareGroupDescribe(List.of(groupId)); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 1caa54325e4..0c930f29862 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -1270,7 +1270,43 @@ public class GroupMetadataManagerTestContext { } public List describeGroups(List groupIds) { - return groupMetadataManager.describeGroups(groupIds, lastCommittedOffset); + RequestContext context = new RequestContext( + new RequestHeader( + ApiKeys.DESCRIBE_GROUPS, + ApiKeys.DESCRIBE_GROUPS.latestVersion(), + DEFAULT_CLIENT_ID, + 0 + ), + "1", + DEFAULT_CLIENT_ADDRESS, + KafkaPrincipal.ANONYMOUS, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, + ClientInformation.EMPTY, + false + ); + + return groupMetadataManager.describeGroups(context, groupIds, lastCommittedOffset); + } + + public List describeGroups(List groupIds, short apiVersion) { + RequestContext context = new RequestContext( + new RequestHeader( + ApiKeys.DESCRIBE_GROUPS, + apiVersion, + DEFAULT_CLIENT_ID, + 0 + ), + "1", + DEFAULT_CLIENT_ADDRESS, + KafkaPrincipal.ANONYMOUS, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, + ClientInformation.EMPTY, + false + ); + + return groupMetadataManager.describeGroups(context, groupIds, lastCommittedOffset); } public List sendShareGroupDescribe(List groupIds) { @@ -1390,6 +1426,21 @@ public class GroupMetadataManagerTestContext { List.of(new DescribeGroupsResponseData.DescribedGroup() .setGroupId(groupId) .setGroupState(DEAD.toString()) + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + .setErrorMessage("Group " + groupId + " not found.") + ), + describedGroups + ); + } + + public void verifyDescribeGroupsBeforeV6ReturnsDeadGroup(String groupId) { + List describedGroups = + describeGroups(Collections.singletonList(groupId), (short) 5); + + assertEquals( + Collections.singletonList(new DescribeGroupsResponseData.DescribedGroup() + .setGroupId(groupId) + .setGroupState(DEAD.toString()) ), describedGroups ); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 2d66e0fd86b..6fbdfeaf79b 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.Time; @@ -1002,7 +1003,9 @@ public class IntegrationTestUtils { .get(applicationId) .get(); return groupDescription.members().isEmpty(); - } catch (final ExecutionException | InterruptedException e) { + } catch (final ExecutionException e) { + return e.getCause() instanceof GroupIdNotFoundException; + } catch (final InterruptedException e) { return false; } } diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java index 526d9661928..a100110517e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java +++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Exit; @@ -170,17 +171,24 @@ public class StreamsResetter { final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups( Collections.singleton(groupId), new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000)); - final List members = - new ArrayList<>(describeResult.describedGroups().get(groupId).get().members()); - if (!members.isEmpty()) { - if (options.hasForce()) { - System.out.println("Force deleting all active members in the group: " + groupId); - adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all().get(); - } else { - throw new IllegalStateException("Consumer group '" + groupId + "' is still active " + try { + final List members = + new ArrayList<>(describeResult.describedGroups().get(groupId).get().members()); + if (!members.isEmpty()) { + if (options.hasForce()) { + System.out.println("Force deleting all active members in the group: " + groupId); + adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all().get(); + } else { + throw new IllegalStateException("Consumer group '" + groupId + "' is still active " + "and has following members: " + members + ". " + "Make sure to stop all running application instances before running the reset tool." + " You can use option '--force' to remove active members from the group."); + } + } + } catch (ExecutionException ee) { + // If the group ID is not found, this is not an error case + if (!(ee.getCause() instanceof GroupIdNotFoundException)) { + throw ee; } } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java index 33e483cbb94..26a93e0a58c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.utils.Utils; @@ -569,34 +570,54 @@ public class ConsumerGroupCommand { switch (state) { case "Empty": case "Dead": - Collection partitionsToReset = getPartitionsToReset(groupId); - Map preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset); - - // Dry-run is the default behavior if --execute is not specified - boolean dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt); - if (!dryRun) { - adminClient.alterConsumerGroupOffsets( - groupId, - preparedOffsets, - withTimeoutMs(new AlterConsumerGroupOffsetsOptions()) - ).all().get(); - } - - result.put(groupId, preparedOffsets); - + result.put(groupId, resetOffsetsForInactiveGroup(groupId)); break; default: printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty()); result.put(groupId, Collections.emptyMap()); } - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } catch (ExecutionException ee) { + if (ee.getCause() instanceof GroupIdNotFoundException) { + result.put(groupId, resetOffsetsForInactiveGroup(groupId)); + } else { + throw new RuntimeException(ee); + } } }); return result; } + private Map resetOffsetsForInactiveGroup(String groupId) { + try { + Collection partitionsToReset = getPartitionsToReset(groupId); + Map preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset); + + // Dry-run is the default behavior if --execute is not specified + boolean dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt); + if (!dryRun) { + adminClient.alterConsumerGroupOffsets( + groupId, + preparedOffsets, + withTimeoutMs(new AlterConsumerGroupOffsetsOptions()) + ).all().get(); + } + + return preparedOffsets; + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } catch (ExecutionException ee) { + Throwable cause = ee.getCause(); + if (cause instanceof KafkaException) { + throw (KafkaException) cause; + } else { + throw new RuntimeException(cause); + } + } + } + Entry> deleteOffsets(String groupId, List topics) { Map partitionLevelResult = new HashMap<>(); Set topicWithPartitions = new HashSet<>(); @@ -702,7 +723,7 @@ public class ConsumerGroupCommand { System.out.printf(format, tp.topic(), tp.partition() >= 0 ? tp.partition() : "Not Provided", - error != null ? "Error: :" + error.getMessage() : "Successful" + error != null ? "Error: " + error.getMessage() : "Successful" ); }); System.out.println(); @@ -1231,8 +1252,10 @@ public class ConsumerGroupCommand { try { f.get(); success.put(g, null); - } catch (ExecutionException | InterruptedException e) { - failed.put(g, e); + } catch (InterruptedException ie) { + failed.put(g, ie); + } catch (ExecutionException e) { + failed.put(g, e.getCause()); } }); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java index 6289b3a3099..48d51622e9c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java @@ -257,7 +257,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions { CommandLineUtils.printUsageAndExit(parser, "Option " + resetOffsetsOpt + " only accepts one of " + executeOpt + " and " + dryRunOpt); if (!options.has(dryRunOpt) && !options.has(executeOpt)) { - System.err.println("WARN: No action will be performed as the --execute option is missing." + + System.err.println("WARN: No action will be performed as the --execute option is missing. " + "In a future major release, the default behavior of this command will be to prompt the user before " + "executing the reset rather than doing a dry run. You should add the --dry-run option explicitly " + "if you are scripting this command and want to keep the current default behavior without prompting."); diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/AuthorizerIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/AuthorizerIntegrationTest.java index e815002899b..36073f26dcc 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/AuthorizerIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/AuthorizerIntegrationTest.java @@ -19,16 +19,20 @@ package org.apache.kafka.tools.consumer.group; import kafka.api.AbstractAuthorizerIntegrationTest; import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.util.Collections; +import java.util.concurrent.ExecutionException; import scala.jdk.javaapi.CollectionConverters; import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; import static org.apache.kafka.common.acl.AclPermissionType.ALLOW; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.fail; public class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { @ParameterizedTest @@ -38,8 +42,12 @@ public class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group()}; ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(cgcArgs); - ConsumerGroupCommand.ConsumerGroupService consumerGroupService = new ConsumerGroupCommand.ConsumerGroupService(opts, Collections.emptyMap()); - consumerGroupService.describeGroups(); - consumerGroupService.close(); + try (ConsumerGroupCommand.ConsumerGroupService consumerGroupService = new ConsumerGroupCommand.ConsumerGroupService(opts, Collections.emptyMap())) { + consumerGroupService.describeGroups(); + fail("Non-existent group should throw an exception"); + } catch (ExecutionException e) { + assertInstanceOf(GroupIdNotFoundException.class, e.getCause(), + "Non-existent group should throw GroupIdNotFoundException"); + } } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java index 3af38c44806..e353aa0b016 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java @@ -100,7 +100,7 @@ public class DeleteConsumerGroupsTest { assertEquals(1, result.size()); assertNotNull(result.get(missingGroupId)); assertInstanceOf(GroupIdNotFoundException.class, - result.get(missingGroupId).getCause(), + result.get(missingGroupId), "The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } } @@ -132,7 +132,7 @@ public class DeleteConsumerGroupsTest { assertEquals(1, result.size()); assertNotNull(result.get(groupId)); assertInstanceOf(GroupNotEmptyException.class, - result.get(groupId).getCause(), + result.get(groupId), "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Result was:(" + result + ")"); } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index 57ec0efbcb3..dd20003eb4e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.clients.consumer.RoundRobinAssignor; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.test.api.ClusterConfig; @@ -68,6 +69,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; @ExtendWith(value = ClusterTestExtensions.class) public class DescribeConsumerGroupTest { @@ -92,9 +94,13 @@ public class DescribeConsumerGroupTest { List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup)); cgcArgs.addAll(describeType); try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(cgcArgs.toArray(new String[0]))) { - String output = ToolsTestUtils.grabConsoleOutput(describeGroups(service)); - assertTrue(output.contains("Consumer group '" + missingGroup + "' does not exist."), - "Expected error was not detected for describe option '" + String.join(" ", describeType) + "'"); + service.describeGroups(); + fail("Expected error was not detected for describe option '" + String.join(" ", describeType) + "'"); + } catch (ExecutionException ee) { + assertInstanceOf(GroupIdNotFoundException.class, ee.getCause()); + assertEquals("Group " + missingGroup + " not found.", ee.getCause().getMessage()); + } catch (Exception e) { + fail("Expected error was not detected for describe option '" + String.join(" ", describeType) + "'"); } } } @@ -113,9 +119,11 @@ public class DescribeConsumerGroupTest { // note the group to be queried is a different (non-existing) group ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup}) ) { - Entry, Optional>> res = service.collectGroupOffsets(missingGroup); - assertTrue(res.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'."); + service.collectGroupOffsets(missingGroup); + fail("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"); + } catch (ExecutionException ee) { + assertInstanceOf(GroupIdNotFoundException.class, ee.getCause(), + "Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"); } } } @@ -132,13 +140,11 @@ public class DescribeConsumerGroupTest { // note the group to be queried is a different (non-existing) group ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup}) ) { - Entry, Optional>> res = service.collectGroupMembers(missingGroup, false); - assertTrue(res.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'."); - - Entry, Optional>> res2 = service.collectGroupMembers(missingGroup, true); - assertTrue(res2.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false), - "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "' (verbose option)."); + service.collectGroupMembers(missingGroup, false); + fail("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"); + } catch (ExecutionException ee) { + assertInstanceOf(GroupIdNotFoundException.class, ee.getCause(), + "Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"); } } } @@ -155,11 +161,11 @@ public class DescribeConsumerGroupTest { // note the group to be queried is a different (non-existing) group ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup}) ) { - GroupInformation state = service.collectGroupState(missingGroup); - assertTrue(Objects.equals(state.groupState, GroupState.DEAD) && state.numMembers == 0 && - state.coordinator != null && clusterInstance.brokerIds().contains(state.coordinator.id()), - "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'." - ); + service.collectGroupState(missingGroup); + fail("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"); + } catch (ExecutionException ee) { + assertInstanceOf(GroupIdNotFoundException.class, ee.getCause(), + "Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"); } } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java index 5aa73dae2de..a49597d638f 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java @@ -136,10 +136,6 @@ public class ResetConsumerGroupOffsetTest { String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute"); try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { - // Make sure we got a coordinator - TestUtils.waitForCondition( - () -> "localhost".equals(service.collectGroupState(group).coordinator.host()), - "Can't find a coordinator"); Map resetOffsets = service.resetOffsets().get(group); assertTrue(resetOffsets.isEmpty()); assertTrue(committedOffsets(cluster, topic, group).isEmpty());